You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/15 20:00:07 UTC

[GitHub] [iceberg] rdsr commented on a change in pull request #1192: Implement Hive input format

rdsr commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r454370239



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapred.TableResolver;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
+                                    implements CombineHiveInputFormat.AvoidSplitCombination {
+
+  private transient Table table;
+  private transient Schema schema;
+  private transient Schema projection;
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    table = TableResolver.resolveTableFromConfiguration(job);
+    schema = table.schema();
+
+    List<String> projectedColumns = parseProjectedColumns(job);
+    projection = projectedColumns.isEmpty() ? schema : projection.select(projectedColumns);
+
+    forwardConfigSettings(job);
+
+    return Arrays.stream(super.getSplits(job, numSplits))
+                 .map(split -> new HiveIcebergSplit((IcebergSplit) split, table.location()))
+                 .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<Record>> getRecordReader(InputSplit split, JobConf job,
+                                                               Reporter reporter) throws IOException {
+    // Since Hive passes a copy of `job` in `getSplits`, we need to forward the conf settings again.
+    forwardConfigSettings(job);
+    return super.getRecordReader(split, job, reporter);
+  }
+
+  @Override
+  public boolean shouldSkipCombine(Path path, Configuration conf) {
+    return true;
+  }
+
+  /**
+   * Forward configuration settings to the underlying MR input format.
+   */
+  private void forwardConfigSettings(JobConf job) {
+    Preconditions.checkNotNull(table, "Table cannot be null");

Review comment:
       Can these properties ever be null?

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapred.TableResolver;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
+                                    implements CombineHiveInputFormat.AvoidSplitCombination {
+
+  private transient Table table;
+  private transient Schema schema;
+  private transient Schema projection;
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    table = TableResolver.resolveTableFromConfiguration(job);

Review comment:
       Who is setting these configurations. IcebergStorageHandler? 

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapred.TableResolver;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
+                                    implements CombineHiveInputFormat.AvoidSplitCombination {
+
+  private transient Table table;
+  private transient Schema schema;
+  private transient Schema projection;
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    table = TableResolver.resolveTableFromConfiguration(job);
+    schema = table.schema();
+
+    List<String> projectedColumns = parseProjectedColumns(job);
+    projection = projectedColumns.isEmpty() ? schema : schema.select(projectedColumns);
+
+    forwardConfigSettings(job);
+
+    return Arrays.stream(super.getSplits(job, numSplits))
+                 .map(split -> new HiveIcebergSplit((IcebergSplit) split, table.location()))
+                 .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<Record>> getRecordReader(InputSplit split, JobConf job,
+                                                               Reporter reporter) throws IOException {
+    // Since Hive passes a copy of `job` in `getSplits`, we need to forward the conf settings again.
+    forwardConfigSettings(job);
+    return super.getRecordReader(split, job, reporter);
+  }
+
+  @Override
+  public boolean shouldSkipCombine(Path path, Configuration conf) {
+    return true;
+  }
+
+  /**
+   * Forward configuration settings to the underlying MR input format.
+   */
+  private void forwardConfigSettings(JobConf job) {
+    Preconditions.checkNotNull(table, "Table cannot be null");
+    Preconditions.checkNotNull(schema, "Schema cannot be null");
+    Preconditions.checkNotNull(projection, "Projection cannot be null");
+
+    // Once mapred.TableResolver and mapreduce.TableResolver use the same property for the location of the table
+    // (TABLE_LOCATION vs. TABLE_PATH), this line can be removed: see https://github.com/apache/iceberg/issues/1155.
+    job.set(InputFormatConfig.TABLE_PATH, table.location());
+    job.set(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema));
+    job.set(InputFormatConfig.READ_SCHEMA, SchemaParser.toJson(projection));
+  }
+
+  private static List<String> parseProjectedColumns(Configuration conf) {
+    if (conf == null) {

Review comment:
       seems like conf  will not be null

##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic MR v1 InputFormat API for Iceberg.
+ *
+ * @param <T> Java class of records constructed by Iceberg; default is {@link Record}
+ */
+public class MapredIcebergInputFormat<T> implements InputFormat<Void, Container<T>> {
+
+  private final org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> innerInputFormat;
+
+  public MapredIcebergInputFormat() {
+    this.innerInputFormat = new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>();
+  }
+
+  /**
+   * Configures the {@code JobConf} to use the {@code MapredIcebergInputFormat} and
+   * returns a helper to add further configuration.
+   *
+   * @param job the {@code JobConf} to configure
+   */
+  public static InputFormatConfig.ConfigBuilder configure(JobConf job) {
+    job.setInputFormat(MapredIcebergInputFormat.class);
+    return new InputFormatConfig.ConfigBuilder(job);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return innerInputFormat.getSplits(newJobContext(job))
+                           .stream()
+                           .map(InputSplit.class::cast)
+                           .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<T>> getRecordReader(InputSplit split, JobConf job,
+                                                          Reporter reporter) throws IOException {
+    IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
+    return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, job, reporter);
+  }
+
+  private static final class MapredIcebergRecordReader<T> implements RecordReader<Void, Container<T>> {
+
+    private final org.apache.hadoop.mapreduce.RecordReader<Void, T> innerReader;
+    private final long splitLength; // for getPos()
+
+    MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> mapreduceInputFormat,
+                              IcebergSplit split, JobConf job, Reporter reporter) throws IOException {
+      TaskAttemptContext context = newTaskAttemptContext(job, reporter);
+
+      try {
+        innerReader = mapreduceInputFormat.createRecordReader(split, context);
+        innerReader.initialize(split, context);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+
+      splitLength = split.getLength();
+    }
+
+    @Override
+    public boolean next(Void key, Container<T> value) throws IOException {
+      try {
+        if (innerReader.nextKeyValue()) {
+          value.set(innerReader.getCurrentValue());
+          return true;
+        }
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(ie);
+      }
+
+      return false;
+    }
+
+    @Override
+    public Void createKey() {
+      return null;
+    }
+
+    @Override
+    public Container<T> createValue() {
+      return new Container<>();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return (long) (splitLength * getProgress());
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      if (innerReader == null) {

Review comment:
       is this null check required?

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.iceberg.mr.SerializationUtil;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+// Hive requires file formats to return splits that are instances of `FileSplit`.
+public class HiveIcebergSplit extends FileSplit implements IcebergSplitContainer {

Review comment:
       Not sure if this is a big win, but can we have the `Container<T>` as an interface which can then be implemented by `Mrv1Value` and `HiveIcebergSplit`

##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic MR v1 InputFormat API for Iceberg.
+ *
+ * @param <T> Java class of records constructed by Iceberg; default is {@link Record}
+ */
+public class MapredIcebergInputFormat<T> implements InputFormat<Void, Container<T>> {
+
+  private final org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> innerInputFormat;
+
+  public MapredIcebergInputFormat() {
+    this.innerInputFormat = new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>();
+  }
+
+  /**
+   * Configures the {@code JobConf} to use the {@code MapredIcebergInputFormat} and
+   * returns a helper to add further configuration.
+   *
+   * @param job the {@code JobConf} to configure
+   */
+  public static InputFormatConfig.ConfigBuilder configure(JobConf job) {
+    job.setInputFormat(MapredIcebergInputFormat.class);
+    return new InputFormatConfig.ConfigBuilder(job);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return innerInputFormat.getSplits(newJobContext(job))
+                           .stream()
+                           .map(InputSplit.class::cast)
+                           .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<T>> getRecordReader(InputSplit split, JobConf job,
+                                                          Reporter reporter) throws IOException {
+    IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
+    return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, job, reporter);
+  }
+
+  private static final class MapredIcebergRecordReader<T> implements RecordReader<Void, Container<T>> {
+
+    private final org.apache.hadoop.mapreduce.RecordReader<Void, T> innerReader;
+    private final long splitLength; // for getPos()
+
+    MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> mapreduceInputFormat,
+                              IcebergSplit split, JobConf job, Reporter reporter) throws IOException {
+      TaskAttemptContext context = newTaskAttemptContext(job, reporter);
+
+      try {
+        innerReader = mapreduceInputFormat.createRecordReader(split, context);
+        innerReader.initialize(split, context);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+
+      splitLength = split.getLength();
+    }
+
+    @Override
+    public boolean next(Void key, Container<T> value) throws IOException {
+      try {
+        if (innerReader.nextKeyValue()) {
+          value.set(innerReader.getCurrentValue());
+          return true;
+        }
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(ie);
+      }
+
+      return false;
+    }
+
+    @Override
+    public Void createKey() {
+      return null;
+    }
+
+    @Override
+    public Container<T> createValue() {
+      return new Container<>();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return (long) (splitLength * getProgress());
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      if (innerReader == null) {
+        return 0;
+      }
+
+      try {
+        return innerReader.getProgress();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (innerReader != null) {
+        innerReader.close();
+      }
+    }
+  }
+
+  private static JobContext newJobContext(JobConf job) {
+    JobID jobID = Optional.ofNullable(JobID.forName(job.get(JobContext.ID)))

Review comment:
       is `JobContext.ID` which maps to `mapreduce.job.id` an MRv2 setting and will not be set for Mrv1 jobs?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org