You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2020/07/10 01:39:54 UTC

[GitHub] [iceberg] guilload opened a new pull request #1192: Implement Hive input format

guilload opened a new pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192


   This PR implements:
   - a MR v1 input format that wraps the MR v2 input format
   - a Hive input format built on top of the MR v1 input format
   
   A few things:
   - The Hive input format successfully passes the Expedia test suite (see this [branch](https://github.com/guilload/iceberg/tree/guilload--if-all-the-things)). Once this work is merged, @massdosage will open a PR with a test suite based on [HiveRunner](https://github.com/klarna/HiveRunner).
   - I've chosen to separate the MR and Hive implementations that way we can reuse the test suite for the MR input format and keep the Hive specific stuff on its own. In the future, I'd also like to experiment wit other in-memory  representations for Hive so that also allows that.
   - I've taken the `IcebergSplit` class out off `IcebergInputFormat` and the class now extends `mapreduce.InputSplit` and implements `mapred.InputSplit` so it can be returned by both MR v1 and v2 file formats.
   - I'm not a fan of the `IcebergSplitContainer` interface trick that I used to avoid overriding `getRecordReader` in subclasses of `MapredIcebergInputFormat`. Recommendations for a more elegant way to do so are welcome.
   - I've refactored the `TestIcebergInputFormat` class quite a bit so it be can run against the two MR file formats implementations.
   
   @rdblue @rdsr @massdosage @cmathiesen 
   
   cc @edgarRd @gustavoatt


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r453134911



##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic Mrv1 InputFormat API for Iceberg.
+ *
+ * @param <T> T is the in memory data model which can either be Pig tuples, Hive rows. Default is Iceberg records
+ */
+public class MapredIcebergInputFormat<T> implements InputFormat<Void, Container<T>> {
+
+  private final org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> innerInputFormat;
+
+  public MapredIcebergInputFormat() {
+    this.innerInputFormat = new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>();
+  }
+
+  /**
+   * Configures the {@code JobConf} to use the {@code MapredIcebergInputFormat} and
+   * returns a helper to add further configuration.
+   *
+   * @param job the {@code JobConf} to configure
+   */
+  public static InputFormatConfig.ConfigBuilder configure(JobConf job) {
+    job.setInputFormat(MapredIcebergInputFormat.class);
+    return new InputFormatConfig.ConfigBuilder(job);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return innerInputFormat.getSplits(getTaskAttemptContext(job))
+                           .stream()
+                           .map(InputSplit.class::cast)
+                           .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<T>> getRecordReader(InputSplit split, JobConf job,
+                                                          Reporter reporter) throws IOException {
+    IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
+    return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, job, reporter);
+  }
+
+  static TaskAttemptContext getTaskAttemptContext(JobConf job) {
+    TaskAttemptID taskAttemptID = Optional.ofNullable(TaskAttemptID.forName(job.get("mapred.task.id")))
+                                          .orElse(new TaskAttemptID());

Review comment:
       I'm guessing that this is because an attempt context is passed as a JobContext. Let's fix that problem and then we won't need to do this. The helpers I mentioned also demonstrate how to create a TaskAttemptContext from specific values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] massdosage commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
massdosage commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r454967221



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapred.TableResolver;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
+                                    implements CombineHiveInputFormat.AvoidSplitCombination {
+
+  private transient Table table;
+  private transient Schema schema;
+  private transient Schema projection;
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    table = TableResolver.resolveTableFromConfiguration(job);
+    schema = table.schema();
+
+    List<String> projectedColumns = parseProjectedColumns(job);
+    projection = projectedColumns.isEmpty() ? schema : projection.select(projectedColumns);

Review comment:
       ```suggestion
       projection = projectedColumns.isEmpty() ? schema : schema.select(projectedColumns);
   ```
   Shouldn't the code be like above? If not I get a NPE like this when running a HiveRunner test:
   
   ```
          Caused by:
                   java.lang.NullPointerException
                       at org.apache.iceberg.mr.hive.HiveIcebergInputFormat.getSplits(HiveIcebergInputFormat.java:58)
                       at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextSplits(FetchOperator.java:372)
                       at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:304)
                       at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:459)
                       ... 13 more
   
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r454370239



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapred.TableResolver;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
+                                    implements CombineHiveInputFormat.AvoidSplitCombination {
+
+  private transient Table table;
+  private transient Schema schema;
+  private transient Schema projection;
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    table = TableResolver.resolveTableFromConfiguration(job);
+    schema = table.schema();
+
+    List<String> projectedColumns = parseProjectedColumns(job);
+    projection = projectedColumns.isEmpty() ? schema : projection.select(projectedColumns);
+
+    forwardConfigSettings(job);
+
+    return Arrays.stream(super.getSplits(job, numSplits))
+                 .map(split -> new HiveIcebergSplit((IcebergSplit) split, table.location()))
+                 .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<Record>> getRecordReader(InputSplit split, JobConf job,
+                                                               Reporter reporter) throws IOException {
+    // Since Hive passes a copy of `job` in `getSplits`, we need to forward the conf settings again.
+    forwardConfigSettings(job);
+    return super.getRecordReader(split, job, reporter);
+  }
+
+  @Override
+  public boolean shouldSkipCombine(Path path, Configuration conf) {
+    return true;
+  }
+
+  /**
+   * Forward configuration settings to the underlying MR input format.
+   */
+  private void forwardConfigSettings(JobConf job) {
+    Preconditions.checkNotNull(table, "Table cannot be null");

Review comment:
       Can these properties ever be null?

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapred.TableResolver;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
+                                    implements CombineHiveInputFormat.AvoidSplitCombination {
+
+  private transient Table table;
+  private transient Schema schema;
+  private transient Schema projection;
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    table = TableResolver.resolveTableFromConfiguration(job);

Review comment:
       Who is setting these configurations. IcebergStorageHandler? 

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapred.TableResolver;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
+                                    implements CombineHiveInputFormat.AvoidSplitCombination {
+
+  private transient Table table;
+  private transient Schema schema;
+  private transient Schema projection;
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    table = TableResolver.resolveTableFromConfiguration(job);
+    schema = table.schema();
+
+    List<String> projectedColumns = parseProjectedColumns(job);
+    projection = projectedColumns.isEmpty() ? schema : schema.select(projectedColumns);
+
+    forwardConfigSettings(job);
+
+    return Arrays.stream(super.getSplits(job, numSplits))
+                 .map(split -> new HiveIcebergSplit((IcebergSplit) split, table.location()))
+                 .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<Record>> getRecordReader(InputSplit split, JobConf job,
+                                                               Reporter reporter) throws IOException {
+    // Since Hive passes a copy of `job` in `getSplits`, we need to forward the conf settings again.
+    forwardConfigSettings(job);
+    return super.getRecordReader(split, job, reporter);
+  }
+
+  @Override
+  public boolean shouldSkipCombine(Path path, Configuration conf) {
+    return true;
+  }
+
+  /**
+   * Forward configuration settings to the underlying MR input format.
+   */
+  private void forwardConfigSettings(JobConf job) {
+    Preconditions.checkNotNull(table, "Table cannot be null");
+    Preconditions.checkNotNull(schema, "Schema cannot be null");
+    Preconditions.checkNotNull(projection, "Projection cannot be null");
+
+    // Once mapred.TableResolver and mapreduce.TableResolver use the same property for the location of the table
+    // (TABLE_LOCATION vs. TABLE_PATH), this line can be removed: see https://github.com/apache/iceberg/issues/1155.
+    job.set(InputFormatConfig.TABLE_PATH, table.location());
+    job.set(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema));
+    job.set(InputFormatConfig.READ_SCHEMA, SchemaParser.toJson(projection));
+  }
+
+  private static List<String> parseProjectedColumns(Configuration conf) {
+    if (conf == null) {

Review comment:
       seems like conf  will not be null

##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic MR v1 InputFormat API for Iceberg.
+ *
+ * @param <T> Java class of records constructed by Iceberg; default is {@link Record}
+ */
+public class MapredIcebergInputFormat<T> implements InputFormat<Void, Container<T>> {
+
+  private final org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> innerInputFormat;
+
+  public MapredIcebergInputFormat() {
+    this.innerInputFormat = new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>();
+  }
+
+  /**
+   * Configures the {@code JobConf} to use the {@code MapredIcebergInputFormat} and
+   * returns a helper to add further configuration.
+   *
+   * @param job the {@code JobConf} to configure
+   */
+  public static InputFormatConfig.ConfigBuilder configure(JobConf job) {
+    job.setInputFormat(MapredIcebergInputFormat.class);
+    return new InputFormatConfig.ConfigBuilder(job);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return innerInputFormat.getSplits(newJobContext(job))
+                           .stream()
+                           .map(InputSplit.class::cast)
+                           .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<T>> getRecordReader(InputSplit split, JobConf job,
+                                                          Reporter reporter) throws IOException {
+    IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
+    return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, job, reporter);
+  }
+
+  private static final class MapredIcebergRecordReader<T> implements RecordReader<Void, Container<T>> {
+
+    private final org.apache.hadoop.mapreduce.RecordReader<Void, T> innerReader;
+    private final long splitLength; // for getPos()
+
+    MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> mapreduceInputFormat,
+                              IcebergSplit split, JobConf job, Reporter reporter) throws IOException {
+      TaskAttemptContext context = newTaskAttemptContext(job, reporter);
+
+      try {
+        innerReader = mapreduceInputFormat.createRecordReader(split, context);
+        innerReader.initialize(split, context);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+
+      splitLength = split.getLength();
+    }
+
+    @Override
+    public boolean next(Void key, Container<T> value) throws IOException {
+      try {
+        if (innerReader.nextKeyValue()) {
+          value.set(innerReader.getCurrentValue());
+          return true;
+        }
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(ie);
+      }
+
+      return false;
+    }
+
+    @Override
+    public Void createKey() {
+      return null;
+    }
+
+    @Override
+    public Container<T> createValue() {
+      return new Container<>();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return (long) (splitLength * getProgress());
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      if (innerReader == null) {

Review comment:
       is this null check required?

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.iceberg.mr.SerializationUtil;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+// Hive requires file formats to return splits that are instances of `FileSplit`.
+public class HiveIcebergSplit extends FileSplit implements IcebergSplitContainer {

Review comment:
       Not sure if this is a big win, but can we have the `Container<T>` as an interface which can then be implemented by `Mrv1Value` and `HiveIcebergSplit`

##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic MR v1 InputFormat API for Iceberg.
+ *
+ * @param <T> Java class of records constructed by Iceberg; default is {@link Record}
+ */
+public class MapredIcebergInputFormat<T> implements InputFormat<Void, Container<T>> {
+
+  private final org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> innerInputFormat;
+
+  public MapredIcebergInputFormat() {
+    this.innerInputFormat = new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>();
+  }
+
+  /**
+   * Configures the {@code JobConf} to use the {@code MapredIcebergInputFormat} and
+   * returns a helper to add further configuration.
+   *
+   * @param job the {@code JobConf} to configure
+   */
+  public static InputFormatConfig.ConfigBuilder configure(JobConf job) {
+    job.setInputFormat(MapredIcebergInputFormat.class);
+    return new InputFormatConfig.ConfigBuilder(job);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return innerInputFormat.getSplits(newJobContext(job))
+                           .stream()
+                           .map(InputSplit.class::cast)
+                           .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<T>> getRecordReader(InputSplit split, JobConf job,
+                                                          Reporter reporter) throws IOException {
+    IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
+    return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, job, reporter);
+  }
+
+  private static final class MapredIcebergRecordReader<T> implements RecordReader<Void, Container<T>> {
+
+    private final org.apache.hadoop.mapreduce.RecordReader<Void, T> innerReader;
+    private final long splitLength; // for getPos()
+
+    MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> mapreduceInputFormat,
+                              IcebergSplit split, JobConf job, Reporter reporter) throws IOException {
+      TaskAttemptContext context = newTaskAttemptContext(job, reporter);
+
+      try {
+        innerReader = mapreduceInputFormat.createRecordReader(split, context);
+        innerReader.initialize(split, context);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+
+      splitLength = split.getLength();
+    }
+
+    @Override
+    public boolean next(Void key, Container<T> value) throws IOException {
+      try {
+        if (innerReader.nextKeyValue()) {
+          value.set(innerReader.getCurrentValue());
+          return true;
+        }
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(ie);
+      }
+
+      return false;
+    }
+
+    @Override
+    public Void createKey() {
+      return null;
+    }
+
+    @Override
+    public Container<T> createValue() {
+      return new Container<>();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return (long) (splitLength * getProgress());
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      if (innerReader == null) {
+        return 0;
+      }
+
+      try {
+        return innerReader.getProgress();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (innerReader != null) {
+        innerReader.close();
+      }
+    }
+  }
+
+  private static JobContext newJobContext(JobConf job) {
+    JobID jobID = Optional.ofNullable(JobID.forName(job.get(JobContext.ID)))

Review comment:
       is `JobContext.ID` which maps to `mapreduce.job.id` an MRv2 setting and will not be set for Mrv1 jobs?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] guilload commented on pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
guilload commented on pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#issuecomment-662056963


   @massdosage, yes I have managed to use the input format in a "real" Hive client. I'm taking a shortcut though, I just add `iceberg-mr` to the `iceberg-spark-runtime` subproject dependencies and use that jar with `add jar ...`. I'll try your way. I don't see anything wrong with it. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] guilload commented on pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
guilload commented on pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#issuecomment-657923197


   @rdblue Thanks for the review. I'll make the changes you suggested tomorrow.
   
   @massdosage, please take a look at my latest commit.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r453132952



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.iceberg.mr.SerializationUtil;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+// Hive requires file formats to return splits that are instances of `FileSplit`.
+public class HiveIcebergSplit extends FileSplit implements IcebergSplitContainer {
+
+  private IcebergSplit innerSplit;
+
+  // The table location of the split allows Hive to map a split to a table and/or partition.
+  // See calls to `getPartitionDescFromPathRecursively` in `CombineHiveInputFormat` or `HiveInputFormat`.
+  private String tableLocation;

Review comment:
       Looks like what's happening is the table location is used as the split's path so that Hive associates all splits with the same `PartitionDesc` that contains a `TableDesc`. Is that correct? If so, I think it would be better to add that as the comment. It's difficult to read the Hive code and figure out what's going on using just the pointers here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r453133848



##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic Mrv1 InputFormat API for Iceberg.
+ *
+ * @param <T> T is the in memory data model which can either be Pig tuples, Hive rows. Default is Iceberg records
+ */
+public class MapredIcebergInputFormat<T> implements InputFormat<Void, Container<T>> {
+
+  private final org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> innerInputFormat;
+
+  public MapredIcebergInputFormat() {
+    this.innerInputFormat = new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>();
+  }
+
+  /**
+   * Configures the {@code JobConf} to use the {@code MapredIcebergInputFormat} and
+   * returns a helper to add further configuration.
+   *
+   * @param job the {@code JobConf} to configure
+   */
+  public static InputFormatConfig.ConfigBuilder configure(JobConf job) {
+    job.setInputFormat(MapredIcebergInputFormat.class);
+    return new InputFormatConfig.ConfigBuilder(job);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return innerInputFormat.getSplits(getTaskAttemptContext(job))
+                           .stream()
+                           .map(InputSplit.class::cast)
+                           .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<T>> getRecordReader(InputSplit split, JobConf job,
+                                                          Reporter reporter) throws IOException {
+    IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
+    return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, job, reporter);
+  }
+
+  static TaskAttemptContext getTaskAttemptContext(JobConf job) {
+    TaskAttemptID taskAttemptID = Optional.ofNullable(TaskAttemptID.forName(job.get("mapred.task.id")))
+                                          .orElse(new TaskAttemptID());

Review comment:
       When would `mapred.task.id` be null? Should we throw an exception in that case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
rdsr commented on pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#issuecomment-662567267


   The changes LGTM!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] guilload commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
guilload commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r456089150



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapred.TableResolver;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
+                                    implements CombineHiveInputFormat.AvoidSplitCombination {
+
+  private transient Table table;
+  private transient Schema schema;
+  private transient Schema projection;
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    table = TableResolver.resolveTableFromConfiguration(job);
+    schema = table.schema();
+
+    List<String> projectedColumns = parseProjectedColumns(job);
+    projection = projectedColumns.isEmpty() ? schema : projection.select(projectedColumns);
+
+    forwardConfigSettings(job);
+
+    return Arrays.stream(super.getSplits(job, numSplits))
+                 .map(split -> new HiveIcebergSplit((IcebergSplit) split, table.location()))
+                 .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<Record>> getRecordReader(InputSplit split, JobConf job,
+                                                               Reporter reporter) throws IOException {
+    // Since Hive passes a copy of `job` in `getSplits`, we need to forward the conf settings again.
+    forwardConfigSettings(job);
+    return super.getRecordReader(split, job, reporter);
+  }
+
+  @Override
+  public boolean shouldSkipCombine(Path path, Configuration conf) {
+    return true;
+  }
+
+  /**
+   * Forward configuration settings to the underlying MR input format.
+   */
+  private void forwardConfigSettings(JobConf job) {
+    Preconditions.checkNotNull(table, "Table cannot be null");

Review comment:
       Only if `getRecordReader` is called before `getSplits`. Happy to remove those checks if we judge them overkill.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] massdosage commented on pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
massdosage commented on pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#issuecomment-662058640


   @guilload OK, thanks, let me know what you find. It has made me think we should probably have this mr module produce an "uber jar" so for the Hive use case you just have to add 1 jar to the classpath, not 6.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#issuecomment-656955915


   @massdosage & @guilload, is the plan to add the `HiveRunner` tests in a separate PR once this is merged? If you want me to, I can merge this and we can fix the comments in a follow-up to unblock next steps.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r453134684



##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic Mrv1 InputFormat API for Iceberg.
+ *
+ * @param <T> T is the in memory data model which can either be Pig tuples, Hive rows. Default is Iceberg records
+ */
+public class MapredIcebergInputFormat<T> implements InputFormat<Void, Container<T>> {
+
+  private final org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> innerInputFormat;
+
+  public MapredIcebergInputFormat() {
+    this.innerInputFormat = new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>();
+  }
+
+  /**
+   * Configures the {@code JobConf} to use the {@code MapredIcebergInputFormat} and
+   * returns a helper to add further configuration.
+   *
+   * @param job the {@code JobConf} to configure
+   */
+  public static InputFormatConfig.ConfigBuilder configure(JobConf job) {
+    job.setInputFormat(MapredIcebergInputFormat.class);
+    return new InputFormatConfig.ConfigBuilder(job);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return innerInputFormat.getSplits(getTaskAttemptContext(job))

Review comment:
       `getSplits` accepts a `JobContext` and I think it makes sense to pass objects that are as close as possible to what the mapreduce framework would use. We have some helper methods in our branch for reading Hive tables from Spark's DSv2 that you might want to check out: https://github.com/Netflix/iceberg/blob/netflix-spark-2.4/metacat/src/main/java/com/netflix/iceberg/batch/MapReduceUtil.java.
   
   Those can help you create mapreduce objects after inspecting the mapred objects.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] guilload commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
guilload commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r456088524



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapred.TableResolver;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
+                                    implements CombineHiveInputFormat.AvoidSplitCombination {
+
+  private transient Table table;
+  private transient Schema schema;
+  private transient Schema projection;
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    table = TableResolver.resolveTableFromConfiguration(job);

Review comment:
       Currently, `mapred.TableResolver` copies the SerDe properties in the main configuration when called in `IcebergSerDe`. Down the road, `IcebergStorageHandler` should probably handle this so we can clean this up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] massdosage commented on pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
massdosage commented on pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#issuecomment-657690598


   @guilload Thanks for raising the InputFormat PR, I've been taking a look at it today to try get some HiveRunner tests working against it. I checked out the branch from the PR and did the minimal I could to get a test running that inserts some data and then returns it, essentially this:
   
   https://github.com/ExpediaGroup/iceberg/blob/if-all-the-things/mr/src/test/java/org/apache/iceberg/mr/mapred/TestInputFormatWithHadoopTables.java#L86
   
   with some changes to the class of the input format (and ignoring all the other test in that class). Unfortunately this is failing for me with exceptions like so:
   ```
   
             Caused by:
                   java.lang.NullPointerException
                       at java.util.Objects.requireNonNull(Objects.java:203)
                       at com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2296)
                       at com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:111)
                       at com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:54)
                       at org.apache.iceberg.SchemaParser.fromJson(SchemaParser.java:247)
                       at org.apache.iceberg.mr.mapreduce.IcebergInputFormat$IcebergRecordReader.initialize(IcebergInputFormat.java:186)
                       at org.apache.iceberg.mr.mapred.MapredIcebergInputFormat$MapredIcebergRecordReader.<init>(MapredIcebergInputFormat.java:93)
                       at org.apache.iceberg.mr.mapred.MapredIcebergInputFormat.getRecordReader(MapredIcebergInputFormat.java:72)
                       at org.apache.hadoop.hive.ql.exec.FetchOperator$FetchInputFormatSplit.getRecordReader(FetchOperator.java:695)
                       at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:333)
                       at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:459)
                       ... 13 more
   ```
   Line 186 in the mapreduce input format is doing this:
   
   `this.tableSchema = SchemaParser.fromJson(conf.get(InputFormatConfig.TABLE_SCHEMA));`
   
   How did you manage to get the HiveRunner tests passing? Am I missing something?
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] guilload commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
guilload commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r454586052



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.iceberg.mr.SerializationUtil;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+// Hive requires file formats to return splits that are instances of `FileSplit`.
+public class HiveIcebergSplit extends FileSplit implements IcebergSplitContainer {
+
+  private IcebergSplit innerSplit;
+
+  // The table location of the split allows Hive to map a split to a table and/or partition.
+  // See calls to `getPartitionDescFromPathRecursively` in `CombineHiveInputFormat` or `HiveInputFormat`.
+  private String tableLocation;

Review comment:
       Hive uses the path name of the split to map it back to a `PartitionDesc` or `TableDesc`, which specify the relevant input format for reading the files belonging to that partition or table. That way, `HiveInputFormat` and `CombineHiveInputFormat` can read files with different input formats in the same MR job and combine compatible splits together.
   
   I'll update the comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] massdosage commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
massdosage commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r454967221



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapred.TableResolver;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
+                                    implements CombineHiveInputFormat.AvoidSplitCombination {
+
+  private transient Table table;
+  private transient Schema schema;
+  private transient Schema projection;
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    table = TableResolver.resolveTableFromConfiguration(job);
+    schema = table.schema();
+
+    List<String> projectedColumns = parseProjectedColumns(job);
+    projection = projectedColumns.isEmpty() ? schema : projection.select(projectedColumns);

Review comment:
       ```suggestion
       projection = projectedColumns.isEmpty() ? schema : schema.select(projectedColumns);
   ```
   Shouldn't the code be like above? If not I get a NPE like this when running a HiveRunner test:
   
            ```
          Caused by:
                   java.lang.NullPointerException
                       at org.apache.iceberg.mr.hive.HiveIcebergInputFormat.getSplits(HiveIcebergInputFormat.java:61)
                       at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextSplits(FetchOperator.java:372)
                       at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:304)
                       at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:459)
                       ... 13 more
   
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r458939851



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.iceberg.mr.SerializationUtil;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+// Hive requires file formats to return splits that are instances of `FileSplit`.
+public class HiveIcebergSplit extends FileSplit implements IcebergSplitContainer {

Review comment:
       I was thinking Mrv1Value would implement Container<T> and HiveIcebergSplit would also implement Container<T>, but that I guess seems not worthwhile for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] guilload commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
guilload commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r456100239



##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic MR v1 InputFormat API for Iceberg.
+ *
+ * @param <T> Java class of records constructed by Iceberg; default is {@link Record}
+ */
+public class MapredIcebergInputFormat<T> implements InputFormat<Void, Container<T>> {
+
+  private final org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> innerInputFormat;
+
+  public MapredIcebergInputFormat() {
+    this.innerInputFormat = new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>();
+  }
+
+  /**
+   * Configures the {@code JobConf} to use the {@code MapredIcebergInputFormat} and
+   * returns a helper to add further configuration.
+   *
+   * @param job the {@code JobConf} to configure
+   */
+  public static InputFormatConfig.ConfigBuilder configure(JobConf job) {
+    job.setInputFormat(MapredIcebergInputFormat.class);
+    return new InputFormatConfig.ConfigBuilder(job);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return innerInputFormat.getSplits(newJobContext(job))
+                           .stream()
+                           .map(InputSplit.class::cast)
+                           .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<T>> getRecordReader(InputSplit split, JobConf job,
+                                                          Reporter reporter) throws IOException {
+    IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
+    return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, job, reporter);
+  }
+
+  private static final class MapredIcebergRecordReader<T> implements RecordReader<Void, Container<T>> {
+
+    private final org.apache.hadoop.mapreduce.RecordReader<Void, T> innerReader;
+    private final long splitLength; // for getPos()
+
+    MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> mapreduceInputFormat,
+                              IcebergSplit split, JobConf job, Reporter reporter) throws IOException {
+      TaskAttemptContext context = newTaskAttemptContext(job, reporter);
+
+      try {
+        innerReader = mapreduceInputFormat.createRecordReader(split, context);
+        innerReader.initialize(split, context);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+
+      splitLength = split.getLength();
+    }
+
+    @Override
+    public boolean next(Void key, Container<T> value) throws IOException {
+      try {
+        if (innerReader.nextKeyValue()) {
+          value.set(innerReader.getCurrentValue());
+          return true;
+        }
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(ie);
+      }
+
+      return false;
+    }
+
+    @Override
+    public Void createKey() {
+      return null;
+    }
+
+    @Override
+    public Container<T> createValue() {
+      return new Container<>();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return (long) (splitLength * getProgress());
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      if (innerReader == null) {
+        return 0;
+      }
+
+      try {
+        return innerReader.getProgress();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (innerReader != null) {
+        innerReader.close();
+      }
+    }
+  }
+
+  private static JobContext newJobContext(JobConf job) {
+    JobID jobID = Optional.ofNullable(JobID.forName(job.get(JobContext.ID)))

Review comment:
       From what I understand, the `mapred.job.id` and `mapreduce.job.id` properties are set by the callers. In practice, I've seen that Hive (when launching a MR job in local-mode) and YARN set the MRv2 properties only.
   
   I guess the MRv1 properties will be set only if a job is run a MRv1 job, which should be very rare, right? Do we want to cover this case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] massdosage commented on pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
massdosage commented on pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#issuecomment-660189350


   > @massdosage Sorry, I broke this branch multiple times in the past few days but this PR is stable and in good shape now. I've added a short HiveRunner test to ensure that this won't happen again. You can extend it or remove it later when you add your own tests.
   > 
   > In addition, you can check out this two commits [10b62fc](https://github.com/apache/iceberg/commit/10b62fc7337eb73d95ed750311112a3b9845ee88) and [f2232b8](https://github.com/apache/iceberg/commit/f2232b80f00c52d56ece7402424287ae4f4f2254). In the former, I branched off of your `if-all-the-things` branch and cherry picked this PR's changes. In the latter, I've modified your tests to run with the "new" `HiveIcebergInputFormat`. The test suite passes successfully.
   > 
   > @rdblue @rdsr I've made the changes you suggested and I believe this PR is ready for a second round of review.
   
   No problem, I took a quick look and the tests you've added look good, I think it makes sense to have them directly in this PR to show that it all works. I don't have time today but I'll try take a proper look on Monday. I'd also like to try out your branch on a Hive client to check everything works end to end on a full Hadoop cluster, not sure if you've also done that yet?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdsr commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
rdsr commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r456806135



##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic MR v1 InputFormat API for Iceberg.
+ *
+ * @param <T> Java class of records constructed by Iceberg; default is {@link Record}
+ */
+public class MapredIcebergInputFormat<T> implements InputFormat<Void, Container<T>> {
+
+  private final org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> innerInputFormat;
+
+  public MapredIcebergInputFormat() {
+    this.innerInputFormat = new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>();
+  }
+
+  /**
+   * Configures the {@code JobConf} to use the {@code MapredIcebergInputFormat} and
+   * returns a helper to add further configuration.
+   *
+   * @param job the {@code JobConf} to configure
+   */
+  public static InputFormatConfig.ConfigBuilder configure(JobConf job) {
+    job.setInputFormat(MapredIcebergInputFormat.class);
+    return new InputFormatConfig.ConfigBuilder(job);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return innerInputFormat.getSplits(newJobContext(job))
+                           .stream()
+                           .map(InputSplit.class::cast)
+                           .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<T>> getRecordReader(InputSplit split, JobConf job,
+                                                          Reporter reporter) throws IOException {
+    IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
+    return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, job, reporter);
+  }
+
+  private static final class MapredIcebergRecordReader<T> implements RecordReader<Void, Container<T>> {
+
+    private final org.apache.hadoop.mapreduce.RecordReader<Void, T> innerReader;
+    private final long splitLength; // for getPos()
+
+    MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> mapreduceInputFormat,
+                              IcebergSplit split, JobConf job, Reporter reporter) throws IOException {
+      TaskAttemptContext context = newTaskAttemptContext(job, reporter);
+
+      try {
+        innerReader = mapreduceInputFormat.createRecordReader(split, context);
+        innerReader.initialize(split, context);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+
+      splitLength = split.getLength();
+    }
+
+    @Override
+    public boolean next(Void key, Container<T> value) throws IOException {
+      try {
+        if (innerReader.nextKeyValue()) {
+          value.set(innerReader.getCurrentValue());
+          return true;
+        }
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(ie);
+      }
+
+      return false;
+    }
+
+    @Override
+    public Void createKey() {
+      return null;
+    }
+
+    @Override
+    public Container<T> createValue() {
+      return new Container<>();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return (long) (splitLength * getProgress());
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      if (innerReader == null) {
+        return 0;
+      }
+
+      try {
+        return innerReader.getProgress();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (innerReader != null) {
+        innerReader.close();
+      }
+    }
+  }
+
+  private static JobContext newJobContext(JobConf job) {
+    JobID jobID = Optional.ofNullable(JobID.forName(job.get(JobContext.ID)))

Review comment:
       If `JobContext.ID` is set correctly for Hive then it should be fine!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] guilload commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
guilload commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r456093511



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.iceberg.mr.SerializationUtil;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+// Hive requires file formats to return splits that are instances of `FileSplit`.
+public class HiveIcebergSplit extends FileSplit implements IcebergSplitContainer {

Review comment:
       `Container` being implemented as a class allow to return a generic `Container<T>` in `MapredIcebergInputFormat.createValue()` and express `MapredIcebergInputFormat<T>` as a generic `InputFormat<Void, Container<T>>`.  I used [DeprecatedParquetInputFormat](https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java) as a reference. 
   
   Can we do that with an interface?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] massdosage commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
massdosage commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r454967221



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapred.TableResolver;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
+                                    implements CombineHiveInputFormat.AvoidSplitCombination {
+
+  private transient Table table;
+  private transient Schema schema;
+  private transient Schema projection;
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    table = TableResolver.resolveTableFromConfiguration(job);
+    schema = table.schema();
+
+    List<String> projectedColumns = parseProjectedColumns(job);
+    projection = projectedColumns.isEmpty() ? schema : projection.select(projectedColumns);

Review comment:
       ```suggestion
       projection = projectedColumns.isEmpty() ? schema : schema.select(projectedColumns);
   ```
   Shouldn't the code be like above? If not I get a NPE like this when running a HiveRunner test:
   
   ```
          Caused by:
                   java.lang.NullPointerException
                       at org.apache.iceberg.mr.hive.HiveIcebergInputFormat.getSplits(HiveIcebergInputFormat.java:61)
                       at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextSplits(FetchOperator.java:372)
                       at org.apache.hadoop.hive.ql.exec.FetchOperator.getRecordReader(FetchOperator.java:304)
                       at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:459)
                       ... 13 more
   
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r453133045



##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/Container.java
##########
@@ -21,48 +21,33 @@
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.IOException;
 import org.apache.hadoop.io.Writable;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.Record;
 
 /**
- * Wraps an Iceberg Record in a Writable which Hive can use in the SerDe.
+ * A simple container of objects that you can get and set.
+ *
+ * @param <T> the Java type of the object held by this container
  */
-public class IcebergWritable implements Writable {
-
-  private Record record;
-  private Schema schema;
-
-  public IcebergWritable(Record record, Schema schema) {
-    this.record = record;
-    this.schema = schema;
-  }
+public class Container<T> implements Writable {
 
-  @SuppressWarnings("checkstyle:HiddenField")
-  public void wrapRecord(Record record) {
-    this.record = record;
-  }
-
-  public Record record() {
-    return record;
-  }
+  private T value;
 
-  public Schema schema() {
-    return schema;
+  public T get() {
+    return value;
   }
 
-  @SuppressWarnings("checkstyle:HiddenField")
-  public void wrapSchema(Schema schema) {
-    this.schema = schema;
+  public void set(T newValue) {
+    this.value = newValue;
   }
 
   @Override
-  public void write(DataOutput dataOutput) {
-    throw new UnsupportedOperationException("write is not supported.");
+  public void readFields(DataInput in) throws IOException {
+    throw new UnsupportedOperationException("readFields is not supported");

Review comment:
       I don't think these last two functions need to change?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] guilload commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
guilload commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r456096635



##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
##########
@@ -100,11 +94,6 @@
 
   @Override
   public List<InputSplit> getSplits(JobContext context) {
-    if (splits != null) {

Review comment:
       Hive keeps a cache of input format instance arounds (see [`HiveInputFormat`](https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java#L112)) that breaks this logic so I've chosen to remove it for now.
   
   We can re-implement this later but the logic will have to be a bit more robust.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] massdosage commented on pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
massdosage commented on pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#issuecomment-661155466


   The tests look good and work for me. I'm now trying to get them running from a Hive client. I've built all the Iceberg jars and uploaded them to the host with the Hive client. I've then created a table using Iceberg from Spark. I've now opened a Hive session and done the following to add the jars to the classpath:
   ```
   add jar /home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-api.jar;
   add jar /home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-core.jar;
   add jar /home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-data.jar;
   add jar /home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-mr.jar;
   add jar /home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-parquet.jar;
   add jar /home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-bundled-guava.jar;
   ```
   I get what look like successful responses from Hive at this point:
   ```
   hive> add jar /home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-bundled-guava.jar;
   Added [/home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-bundled-guava.jar] to class path
   Added resources: [/home/hadoop/iceberg/0.9.0-SNAPSHOT/iceberg-bundled-guava.jar]
   ```
   I then try create a table something like so:
   ```
   CREATE EXTERNAL TABLE default.hiveberg_table_a_guilload 
   ROW FORMAT SERDE 'org.apache.iceberg.mr.mapred.IcebergSerDe' 
   STORED AS 
   INPUTFORMAT 'org.apache.iceberg.mr.hive.HiveIcebergInputFormat' 
   OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' 
   LOCATION 'hdfs://path/to/spark-created/table_a';
   ```
   However I then see this error in the Hive user logs:
   `2020-07-20T16:20:29,298 ERROR [759dc8f9-310b-4889-a9db-a48cf4c1c424 main([])]: exec.DDLTask (DDLTask.java:failed(639)) - java.lang.NoClassDefFoundError: org/apache/iceberg/relocated/com/google/common/base/Preconditions`
   which is very strange since that class should be there. I've run out of time to look further today, will carry on later in the week. @guilload have you managed to get it running end to end in Hive "for reals"? If so, did you do any steps differently to what I'm doing?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] guilload commented on pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
guilload commented on pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#issuecomment-659718716


   @massdosage Sorry, I broke this branch multiple times in the past few days but this PR is stable and in good shape now. I've added a short HiveRunner test to ensure that this won't happen again. You can extend it or remove it later when you add your own tests.
   
   In addition, you can check out this two commits 10b62fc7337eb73d95ed750311112a3b9845ee88 and f2232b80f00c52d56ece7402424287ae4f4f2254. In the former,  I branched off of your `if-all-the-things` branch and cherry picked this PR's changes. In the latter, I've modified your tests to run with the "new" `HiveIcebergInputFormat`. The test suite passes successfully.
   
   @rdblue @rdsr I've made the changes you suggested and I believe this PR is ready for a second round of review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] HotSushi commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
HotSushi commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r458440578



##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInputFormat.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.StandaloneHiveRunner;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.mr.mapred.IcebergSerDe;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+@RunWith(StandaloneHiveRunner.class)
+public class TestHiveIcebergInputFormat {
+
+  @HiveSQL(files = {}, autoStart = true)
+  private HiveShell shell;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private static final Schema CUSTOMER_SCHEMA = new Schema(
+          required(1, "customer_id", Types.LongType.get()),
+          required(2, "first_name", Types.StringType.get())
+  );
+
+  private static final List<Record> CUSTOMER_RECORDS = TestHelper.RecordsBuilder.newInstance(CUSTOMER_SCHEMA)
+          .add(0L, "Alice")
+          .add(1L, "Bob")
+          .add(2L, "Trudy")
+          .build();
+
+  private static final Schema ORDER_SCHEMA = new Schema(
+          required(1, "order_id", Types.LongType.get()),
+          required(2, "customer_id", Types.LongType.get()),
+          required(3, "total", Types.DoubleType.get()));
+
+  private static final List<Record> ORDER_RECORDS = TestHelper.RecordsBuilder.newInstance(ORDER_SCHEMA)
+          .add(100L, 0L, 11.11d)
+          .add(101L, 0L, 22.22d)
+          .add(102L, 1L, 33.33d)
+          .build();
+
+  // before variables
+  private HadoopTables tables;
+  private Table customerTable;
+  private Table orderTable;
+
+  @Before
+  public void before() throws IOException {
+    Configuration conf = new Configuration();
+    tables = new HadoopTables(conf);
+
+    File customerLocation = temp.newFolder("customers");
+    Assert.assertTrue(customerLocation.delete());
+
+    TestHelper customerHelper = new TestHelper(
+            conf, tables, CUSTOMER_SCHEMA, PartitionSpec.unpartitioned(), FileFormat.PARQUET, temp, customerLocation);
+
+    customerTable = customerHelper.createUnpartitionedTable();
+    customerHelper.appendToTable(customerHelper.writeFile(null, CUSTOMER_RECORDS));
+
+    File orderLocation = temp.newFolder("orders");
+    Assert.assertTrue(orderLocation.delete());
+
+    TestHelper orderHelper = new TestHelper(
+            conf, tables, ORDER_SCHEMA, PartitionSpec.unpartitioned(), FileFormat.PARQUET, temp, orderLocation);
+
+    orderTable = orderHelper.createUnpartitionedTable();
+    orderHelper.appendToTable(orderHelper.writeFile(null, ORDER_RECORDS));
+  }
+
+  @Test
+  public void testScanEmptyTable() throws IOException {
+    File emptyLocation = temp.newFolder("empty");
+    Assert.assertTrue(emptyLocation.delete());
+
+    Schema emptySchema = new Schema(required(1, "empty", Types.StringType.get()));
+    Table emptyTable = tables.create(
+            emptySchema, PartitionSpec.unpartitioned(), Collections.emptyMap(), emptyLocation.toString());
+    createHiveTable("empty", emptyTable.location());
+
+    List<Object[]> rows = shell.executeStatement("SELECT * FROM default.empty");
+    Assert.assertEquals(0, rows.size());
+  }
+
+  @Test
+  public void testScanTable() {
+    createHiveTable("customers", customerTable.location());
+
+    // Single fetch task: no MR job.
+    List<Object[]> rows = shell.executeStatement("SELECT * FROM default.customers");
+
+    Assert.assertEquals(3, rows.size());
+    Assert.assertArrayEquals(new Object[] {0L, "Alice"}, rows.get(0));
+    Assert.assertArrayEquals(new Object[] {1L, "Bob"}, rows.get(1));
+    Assert.assertArrayEquals(new Object[] {2L, "Trudy"}, rows.get(2));
+
+    // Adding the ORDER BY clause will cause Hive to spawn a local MR job this time.
+    List<Object[]> descRows = shell.executeStatement("SELECT * FROM default.customers ORDER BY customer_id DESC");
+
+    Assert.assertEquals(3, rows.size());

Review comment:
       This should be `Assert.assertEquals(3, descRows.size());`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#issuecomment-662573973


   Thanks for reviewing, @rdsr!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] guilload commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
guilload commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r456121465



##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic Mrv1 InputFormat API for Iceberg.
+ *
+ * @param <T> T is the in memory data model which can either be Pig tuples, Hive rows. Default is Iceberg records
+ */
+public class MapredIcebergInputFormat<T> implements InputFormat<Void, Container<T>> {
+
+  private final org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> innerInputFormat;
+
+  public MapredIcebergInputFormat() {
+    this.innerInputFormat = new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>();
+  }
+
+  /**
+   * Configures the {@code JobConf} to use the {@code MapredIcebergInputFormat} and
+   * returns a helper to add further configuration.
+   *
+   * @param job the {@code JobConf} to configure
+   */
+  public static InputFormatConfig.ConfigBuilder configure(JobConf job) {
+    job.setInputFormat(MapredIcebergInputFormat.class);
+    return new InputFormatConfig.ConfigBuilder(job);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return innerInputFormat.getSplits(getTaskAttemptContext(job))
+                           .stream()
+                           .map(InputSplit.class::cast)
+                           .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<T>> getRecordReader(InputSplit split, JobConf job,
+                                                          Reporter reporter) throws IOException {
+    IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
+    return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, job, reporter);
+  }
+
+  static TaskAttemptContext getTaskAttemptContext(JobConf job) {
+    TaskAttemptID taskAttemptID = Optional.ofNullable(TaskAttemptID.forName(job.get("mapred.task.id")))
+                                          .orElse(new TaskAttemptID());

Review comment:
       I looked into this and Hive (when spawning a local MR job) or YARN populates the `mapreduce.task.id` and `mapreduce.task.attempt.id"` (among many other properties). So I changed this line to: `TaskAttemptID.forName(job.get("mapreduce.task.attempt.id")`. Several input formats in the Hive codebase do the same.
   
   I believe it is not the responsibility of the input format to create a `TaskAttemptID` from scratch and setting the `mapred{uce}*` properties. The framework using the input format are responsible for that.
   
   During my tests, I've seen that the task attempt it is not set only when Hive uses the the file input format outside of a MR job (single fetch task). This is when we need to fallback to the default constructor.
   
   The same logic applies for the `JobContext` object.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] guilload commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
guilload commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r456093788



##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic MR v1 InputFormat API for Iceberg.
+ *
+ * @param <T> Java class of records constructed by Iceberg; default is {@link Record}
+ */
+public class MapredIcebergInputFormat<T> implements InputFormat<Void, Container<T>> {
+
+  private final org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> innerInputFormat;
+
+  public MapredIcebergInputFormat() {
+    this.innerInputFormat = new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>();
+  }
+
+  /**
+   * Configures the {@code JobConf} to use the {@code MapredIcebergInputFormat} and
+   * returns a helper to add further configuration.
+   *
+   * @param job the {@code JobConf} to configure
+   */
+  public static InputFormatConfig.ConfigBuilder configure(JobConf job) {
+    job.setInputFormat(MapredIcebergInputFormat.class);
+    return new InputFormatConfig.ConfigBuilder(job);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return innerInputFormat.getSplits(newJobContext(job))
+                           .stream()
+                           .map(InputSplit.class::cast)
+                           .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<T>> getRecordReader(InputSplit split, JobConf job,
+                                                          Reporter reporter) throws IOException {
+    IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
+    return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, job, reporter);
+  }
+
+  private static final class MapredIcebergRecordReader<T> implements RecordReader<Void, Container<T>> {
+
+    private final org.apache.hadoop.mapreduce.RecordReader<Void, T> innerReader;
+    private final long splitLength; // for getPos()
+
+    MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> mapreduceInputFormat,
+                              IcebergSplit split, JobConf job, Reporter reporter) throws IOException {
+      TaskAttemptContext context = newTaskAttemptContext(job, reporter);
+
+      try {
+        innerReader = mapreduceInputFormat.createRecordReader(split, context);
+        innerReader.initialize(split, context);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+
+      splitLength = split.getLength();
+    }
+
+    @Override
+    public boolean next(Void key, Container<T> value) throws IOException {
+      try {
+        if (innerReader.nextKeyValue()) {
+          value.set(innerReader.getCurrentValue());
+          return true;
+        }
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(ie);
+      }
+
+      return false;
+    }
+
+    @Override
+    public Void createKey() {
+      return null;
+    }
+
+    @Override
+    public Container<T> createValue() {
+      return new Container<>();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return (long) (splitLength * getProgress());
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      if (innerReader == null) {

Review comment:
       Probably not, will remove.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r453133458



##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic Mrv1 InputFormat API for Iceberg.
+ *
+ * @param <T> T is the in memory data model which can either be Pig tuples, Hive rows. Default is Iceberg records

Review comment:
       Could you update this to `Java class of records constructed by Iceberg; default is {@link Record}`?
   
   It is odd that this currently states that T could be either A or B, but defaults to C.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] massdosage commented on pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
massdosage commented on pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#issuecomment-657651234


   > @massdosage & @guilload, is the plan to add the `HiveRunner` tests in a separate PR once this is merged? If you want me to, I can merge this and we can fix the comments in a follow-up to unblock next steps.
   
   I'm taking a look at this now, but yes, I should be able to add the `HiveRunner` steps in a fast-follow to this PR. I'd also like @cmathiesen to take a look at this and comment on what impact these changes have on the features we were going to move over next from [Hiveberg](https://github.com/ExpediaGroup/hiveberg) like various pushdowns and exposing system tables which we have ready against the other InputFormat.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#issuecomment-656955758


   Looks good to me. I made a few comments, but I don't think there is anything major to fix. It would be nice to fix some of the comments and to instantiate the class, `JobContext`, for `getSplits`. Otherwise this looks good and we should be able to commit it soon.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r453133458



##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic Mrv1 InputFormat API for Iceberg.
+ *
+ * @param <T> T is the in memory data model which can either be Pig tuples, Hive rows. Default is Iceberg records

Review comment:
       Could you update this to "Java class of records constructed by Iceberg; default is {@link Record}"?
   
   It is odd that this currently states that T could be either A or B, but defaults to C.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] guilload commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
guilload commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r456100239



##########
File path: mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic MR v1 InputFormat API for Iceberg.
+ *
+ * @param <T> Java class of records constructed by Iceberg; default is {@link Record}
+ */
+public class MapredIcebergInputFormat<T> implements InputFormat<Void, Container<T>> {
+
+  private final org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> innerInputFormat;
+
+  public MapredIcebergInputFormat() {
+    this.innerInputFormat = new org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>();
+  }
+
+  /**
+   * Configures the {@code JobConf} to use the {@code MapredIcebergInputFormat} and
+   * returns a helper to add further configuration.
+   *
+   * @param job the {@code JobConf} to configure
+   */
+  public static InputFormatConfig.ConfigBuilder configure(JobConf job) {
+    job.setInputFormat(MapredIcebergInputFormat.class);
+    return new InputFormatConfig.ConfigBuilder(job);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    return innerInputFormat.getSplits(newJobContext(job))
+                           .stream()
+                           .map(InputSplit.class::cast)
+                           .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<T>> getRecordReader(InputSplit split, JobConf job,
+                                                          Reporter reporter) throws IOException {
+    IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
+    return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, job, reporter);
+  }
+
+  private static final class MapredIcebergRecordReader<T> implements RecordReader<Void, Container<T>> {
+
+    private final org.apache.hadoop.mapreduce.RecordReader<Void, T> innerReader;
+    private final long splitLength; // for getPos()
+
+    MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> mapreduceInputFormat,
+                              IcebergSplit split, JobConf job, Reporter reporter) throws IOException {
+      TaskAttemptContext context = newTaskAttemptContext(job, reporter);
+
+      try {
+        innerReader = mapreduceInputFormat.createRecordReader(split, context);
+        innerReader.initialize(split, context);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+
+      splitLength = split.getLength();
+    }
+
+    @Override
+    public boolean next(Void key, Container<T> value) throws IOException {
+      try {
+        if (innerReader.nextKeyValue()) {
+          value.set(innerReader.getCurrentValue());
+          return true;
+        }
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(ie);
+      }
+
+      return false;
+    }
+
+    @Override
+    public Void createKey() {
+      return null;
+    }
+
+    @Override
+    public Container<T> createValue() {
+      return new Container<>();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return (long) (splitLength * getProgress());
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      if (innerReader == null) {
+        return 0;
+      }
+
+      try {
+        return innerReader.getProgress();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (innerReader != null) {
+        innerReader.close();
+      }
+    }
+  }
+
+  private static JobContext newJobContext(JobConf job) {
+    JobID jobID = Optional.ofNullable(JobID.forName(job.get(JobContext.ID)))

Review comment:
       From what I understand, the `mapred.job.id` and `mapreduce.job.id` properties are set by the callers. In practice, I've seen that Hive (when launching a MR job in local-mode) and YARN set the MRv2 properties only.
   
   I guess the MRv1 properties will be set only if the job is run as a MRv1 job, which should be very rare, right? Do we want to cover this case?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] massdosage commented on pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
massdosage commented on pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#issuecomment-658708430


   > @rdblue Thanks for the review. I'll make the changes you suggested tomorrow.
   > 
   > @massdosage, please take a look at my latest commit.
   
   I merged in your changes from yesterday, I've added a comment above on a NPE I ran into. I then also get the following error when I run the equivalent of [this test](https://github.com/ExpediaGroup/iceberg/blob/if-all-the-things/mr/src/test/java/org/apache/iceberg/mr/mapred/TestInputFormatWithEmptyTable.java):
   
   ```
   org.apache.iceberg.mr.hive.TestHiveIcebergInputFormat > emptyTable FAILED
       java.lang.IllegalArgumentException: Failed to executeQuery Hive query SELECT id, data FROM source_db.table_a: java.io.IOException: java.lang.ClassCastException: org.apache.iceberg.mr.mapred.Container cannot be cast to org.apache.hadoop.io.BinaryComparable
           at com.klarna.hiverunner.HiveServerContainer.executeStatement(HiveServerContainer.java:147)
           at com.klarna.hiverunner.builder.HiveShellBase.executeStatementsWithCommandShellEmulation(HiveShellBase.java:115)
           at com.klarna.hiverunner.builder.HiveShellBase.executeStatementWithCommandShellEmulation(HiveShellBase.java:109)
           at com.klarna.hiverunner.builder.HiveShellBase.executeStatement(HiveShellBase.java:99)
           at org.apache.iceberg.mr.hive.TestHiveIcebergInputFormat.emptyTable(TestHiveIcebergInputFormat.java:197)
   
           Caused by:
           org.apache.hive.service.cli.HiveSQLException: java.io.IOException: java.lang.ClassCastException: org.apache.iceberg.mr.mapred.Container cannot be cast to org.apache.hadoop.io.BinaryComparable
               at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:499)
               at org.apache.hive.service.cli.operation.OperationManager.getOperationNextRowSet(OperationManager.java:307)
               at org.apache.hive.service.cli.session.HiveSessionImpl.fetchResults(HiveSessionImpl.java:878)
               at org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:559)
               at org.apache.hive.service.cli.CLIService.fetchResults(CLIService.java:551)
               at com.klarna.hiverunner.HiveServerContainer.executeStatement(HiveServerContainer.java:129)
               ... 4 more
   
               Caused by:
               java.io.IOException: java.lang.ClassCastException: org.apache.iceberg.mr.mapred.Container cannot be cast to org.apache.hadoop.io.BinaryComparable
                   at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:521)
                   at org.apache.hadoop.hive.ql.exec.FetchOperator.pushRow(FetchOperator.java:428)
                   at org.apache.hadoop.hive.ql.exec.FetchTask.fetch(FetchTask.java:147)
                   at org.apache.hadoop.hive.ql.Driver.getResults(Driver.java:2208)
                   at org.apache.hive.service.cli.operation.SQLOperation.getNextRowSet(SQLOperation.java:494)
                   ... 9 more
   
                   Caused by:
                   java.lang.ClassCastException: org.apache.iceberg.mr.mapred.Container cannot be cast to org.apache.hadoop.io.BinaryComparable
                       at org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.doDeserialize(LazySimpleSerDe.java:151)
                       at org.apache.hadoop.hive.serde2.AbstractEncodingAwareSerDe.deserialize(AbstractEncodingAwareSerDe.java:76)
                       at org.apache.hadoop.hive.ql.exec.FetchOperator.getNextRow(FetchOperator.java:502)
                       ... 13 more
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] guilload commented on a change in pull request #1192: Implement Hive input format

Posted by GitBox <gi...@apache.org>.
guilload commented on a change in pull request #1192:
URL: https://github.com/apache/iceberg/pull/1192#discussion_r456093511



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.iceberg.mr.SerializationUtil;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+// Hive requires file formats to return splits that are instances of `FileSplit`.
+public class HiveIcebergSplit extends FileSplit implements IcebergSplitContainer {

Review comment:
       `Container` being implemented as a class allows to return a generic `Container<T>` in `MapredIcebergInputFormat.createValue()` and express `MapredIcebergInputFormat<T>` as a generic `InputFormat<Void, Container<T>>`.  I used [DeprecatedParquetInputFormat](https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java) as a reference. 
   
   Can we do that with an interface?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org