You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/01/06 12:16:42 UTC

[GitHub] [iceberg] pvary opened a new pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

pvary opened a new pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559047654



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
##########
@@ -206,33 +207,42 @@ public static PartitionSpec spec(Configuration config) {
    * @param config The target configuration to store to
    * @param table The table which we want to store to the configuration
    */
+
   @VisibleForTesting
-  static void put(Configuration config, Table table) {
-    // The Table contains a FileIO and the FileIO serializes the configuration so we might end up recursively
-    // serializing the objects. To avoid this unset the values for now before serializing.
-    config.unset(InputFormatConfig.SERIALIZED_TABLE);
-    config.unset(InputFormatConfig.FILE_IO);
-    config.unset(InputFormatConfig.LOCATION_PROVIDER);
-    config.unset(InputFormatConfig.ENCRYPTION_MANAGER);
-    config.unset(InputFormatConfig.TABLE_LOCATION);
-    config.unset(InputFormatConfig.TABLE_SCHEMA);
-    config.unset(InputFormatConfig.PARTITION_SPEC);
-
-    String base64Table = table instanceof Serializable ? SerializationUtil.serializeToBase64(table) : null;
-    String base64Io = SerializationUtil.serializeToBase64(table.io());
-    String base64LocationProvider = SerializationUtil.serializeToBase64(table.locationProvider());
-    String base64EncryptionManager = SerializationUtil.serializeToBase64(table.encryption());
-
-    if (base64Table != null) {
-      config.set(InputFormatConfig.SERIALIZED_TABLE, base64Table);
+  static void overlayTableProperties(Configuration configuration, TableDesc tableDesc, Map<String, String> map) {
+    Properties props = tableDesc.getProperties();
+    Table table = Catalogs.loadTable(configuration, props);
+    String schemaJson = SchemaParser.toJson(table.schema());
+
+    Map<String, String> original = new HashMap<>(map);
+    map.clear();
+
+    map.putAll(Maps.fromProperties(props));
+    map.putAll(original);
+
+    map.put(InputFormatConfig.TABLE_IDENTIFIER, props.getProperty(Catalogs.NAME));
+    map.put(InputFormatConfig.TABLE_LOCATION, table.location());
+    map.put(InputFormatConfig.TABLE_SCHEMA, schemaJson);
+    map.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(table.spec()));
+    String formatString = PropertyUtil.propertyAsString(table.properties(), DEFAULT_FILE_FORMAT,
+        DEFAULT_FILE_FORMAT_DEFAULT);
+    map.put(InputFormatConfig.WRITE_FILE_FORMAT, formatString.toUpperCase(Locale.ENGLISH));
+    map.put(InputFormatConfig.WRITE_TARGET_FILE_SIZE,
+        table.properties().getOrDefault(WRITE_TARGET_FILE_SIZE_BYTES,
+            String.valueOf(WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT)));
+
+    if (table instanceof Serializable) {
+      map.put(InputFormatConfig.SERIALIZED_TABLE, SerializationUtil.serializeToBase64(table));
     }
 
-    config.set(InputFormatConfig.FILE_IO, base64Io);
-    config.set(InputFormatConfig.LOCATION_PROVIDER, base64LocationProvider);
-    config.set(InputFormatConfig.ENCRYPTION_MANAGER, base64EncryptionManager);
+    map.put(InputFormatConfig.FILE_IO, SerializationUtil.serializeToBase64(table.io()));
+    map.put(InputFormatConfig.LOCATION_PROVIDER, SerializationUtil.serializeToBase64(table.locationProvider()));
+    map.put(InputFormatConfig.ENCRYPTION_MANAGER, SerializationUtil.serializeToBase64(table.encryption()));
+    // We need to remove this otherwise the job.xml will be invalid
+    map.remove("columns.comments");

Review comment:
       Can you explain this a little bit more?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559625879



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
##########
@@ -206,33 +207,42 @@ public static PartitionSpec spec(Configuration config) {
    * @param config The target configuration to store to
    * @param table The table which we want to store to the configuration
    */
+

Review comment:
       Fixed the javadoc too (I suspect I have left the extra line when I started to fix the doc and got distracted :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559712783



##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -256,4 +265,293 @@ public void testSelectDistinctFromTable() throws IOException {
       Assert.assertEquals(tableName, size, distinctIds);
     }
   }
+
+  @Test
+  public void testInsert() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, ImmutableList.of());
+
+    // The expected query is like
+    // INSERT INTO customers VALUES (0, 'Alice'), (1, 'Bob'), (2, 'Trudy')
+    StringBuilder query = new StringBuilder().append("INSERT INTO customers VALUES ");
+    HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.forEach(record -> query.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    HiveIcebergTestUtils.validateData(table, new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS), 0);
+  }
+
+  @Test
+  public void testInsertFromSelect() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    records.addAll(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testInsertFromSelectWithOrderBy() throws IOException {

Review comment:
       The first one is Map only query, the second is Map-Reduce query. So we test both cases.
   Added comments to the tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559624859



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -19,21 +19,63 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.util.Properties;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
 
-public class HiveIcebergOutputFormat<T> implements OutputFormat<Void, T> {
+public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Container>,
+    HiveOutputFormat<NullWritable, Container> {

Review comment:
       Sure thing.
   Sorry for missing this!
   
   Fixed

##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -19,21 +19,63 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.util.Properties;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
 
-public class HiveIcebergOutputFormat<T> implements OutputFormat<Void, T> {
+public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Container>,
+    HiveOutputFormat<NullWritable, Container> {
+
+  private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
+
+  @Override
+  @SuppressWarnings("rawtypes")

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559626416



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
##########
@@ -206,33 +207,42 @@ public static PartitionSpec spec(Configuration config) {
    * @param config The target configuration to store to
    * @param table The table which we want to store to the configuration
    */
+
   @VisibleForTesting
-  static void put(Configuration config, Table table) {
-    // The Table contains a FileIO and the FileIO serializes the configuration so we might end up recursively
-    // serializing the objects. To avoid this unset the values for now before serializing.
-    config.unset(InputFormatConfig.SERIALIZED_TABLE);
-    config.unset(InputFormatConfig.FILE_IO);
-    config.unset(InputFormatConfig.LOCATION_PROVIDER);
-    config.unset(InputFormatConfig.ENCRYPTION_MANAGER);
-    config.unset(InputFormatConfig.TABLE_LOCATION);
-    config.unset(InputFormatConfig.TABLE_SCHEMA);
-    config.unset(InputFormatConfig.PARTITION_SPEC);
-
-    String base64Table = table instanceof Serializable ? SerializationUtil.serializeToBase64(table) : null;
-    String base64Io = SerializationUtil.serializeToBase64(table.io());
-    String base64LocationProvider = SerializationUtil.serializeToBase64(table.locationProvider());
-    String base64EncryptionManager = SerializationUtil.serializeToBase64(table.encryption());
-
-    if (base64Table != null) {
-      config.set(InputFormatConfig.SERIALIZED_TABLE, base64Table);
+  static void overlayTableProperties(Configuration configuration, TableDesc tableDesc, Map<String, String> map) {
+    Properties props = tableDesc.getProperties();
+    Table table = Catalogs.loadTable(configuration, props);
+    String schemaJson = SchemaParser.toJson(table.schema());
+
+    Map<String, String> original = new HashMap<>(map);
+    map.clear();
+
+    map.putAll(Maps.fromProperties(props));
+    map.putAll(original);

Review comment:
       I was not sure what is the more readable code. The one suggested by you is much nicer!
   Changed

##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -256,4 +265,293 @@ public void testSelectDistinctFromTable() throws IOException {
       Assert.assertEquals(tableName, size, distinctIds);
     }
   }
+
+  @Test
+  public void testInsert() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, ImmutableList.of());
+
+    // The expected query is like
+    // INSERT INTO customers VALUES (0, 'Alice'), (1, 'Bob'), (2, 'Trudy')
+    StringBuilder query = new StringBuilder().append("INSERT INTO customers VALUES ");
+    HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.forEach(record -> query.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    HiveIcebergTestUtils.validateData(table, new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS), 0);
+  }
+
+  @Test
+  public void testInsertFromSelect() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    records.addAll(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testInsertFromSelectWithOrderBy() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    // We expect that there will be Mappers and Reducers here
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers ORDER BY customer_id");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    records.addAll(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testWriteArrayOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "arrayofprimitives",
+            Types.ListType.ofRequired(3, Types.StringType.get())));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(
+            required(1, "id", Types.LongType.get()),
+            required(2, "arrayofarrays",
+                Types.ListType.ofRequired(3, Types.ListType.ofRequired(4, Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<String>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(required(1, "id", Types.LongType.get()),
+            required(2, "arrayofmaps", Types.ListType
+                .ofRequired(3, Types.MapType.ofRequired(4, 5, Types.StringType.get(),
+                    Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<Map<String, String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(required(1, "id", Types.LongType.get()),
+            required(2, "arrayofstructs", Types.ListType.ofRequired(3, Types.StructType
+                .of(required(4, "something", Types.StringType.get()), required(5, "someone",
+                    Types.StringType.get()), required(6, "somewhere", Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<?>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofprimitives", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.StringType.get())));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, String>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofarrays",
+            Types.MapType.ofRequired(3, 4, Types.StringType.get(), Types.ListType.ofRequired(5,
+                Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, List<String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofmaps", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.MapType.ofRequired(5, 6, Types.StringType.get(), Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, Map<String, String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofstructs", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.StructType.of(required(5, "something", Types.StringType.get()),
+                required(6, "someone", Types.StringType.get()),
+                required(7, "somewhere", Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, GenericRecord>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofprimitives",
+            Types.StructType.of(required(3, "key", Types.StringType.get()), required(4, "value",
+                Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofarrays", Types.StructType
+            .of(required(3, "names", Types.ListType.ofRequired(4, Types.StringType.get())),
+                required(5, "birthdays", Types.ListType.ofRequired(6,
+                    Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofmaps", Types.StructType
+            .of(required(3, "map1", Types.MapType.ofRequired(4, 5,
+                Types.StringType.get(), Types.StringType.get())), required(6, "map2",
+                Types.MapType.ofRequired(7, 8, Types.StringType.get(),
+                    Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofstructs", Types.StructType.of(required(3, "struct1", Types.StructType
+            .of(required(4, "key", Types.StringType.get()), required(5, "value",
+                Types.StringType.get()))))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testPartitionedWrite() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    PartitionSpec spec =
+        PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA).bucket("customer_id", 3).build();
+
+    TableIdentifier identifier = TableIdentifier.of("default", "partitioned_customers");
+
+    shell.executeStatement("CREATE EXTERNAL TABLE " + identifier +
+        " STORED BY '" + HiveIcebergStorageHandler.class.getName() + "' " +
+        testTables.locationForCreateTableSQL(identifier) +
+        "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
+        SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
+        "'" + InputFormatConfig.PARTITION_SPEC + "'='" +
+        PartitionSpecParser.toJson(spec) + "', " +
+        "'" + InputFormatConfig.WRITE_FILE_FORMAT + "'='" + fileFormat + "')");
+
+    List<Record> records = TestHelper.generateRandomRecords(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 4, 0L);
+
+    StringBuilder query = new StringBuilder().append("INSERT INTO " + identifier + " VALUES ");
+    records.forEach(record -> query.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    Table table = testTables.loadTable(identifier);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  private void testComplexTypeWrite(Schema schema, List<Record> records) throws IOException {
+    String tableName = "complex_table";
+    Table table = testTables.createTable(shell, "complex_table", schema,
+        fileFormat, ImmutableList.of());

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559047041



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -19,21 +19,63 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.util.Properties;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
 
-public class HiveIcebergOutputFormat<T> implements OutputFormat<Void, T> {
+public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Container>,
+    HiveOutputFormat<NullWritable, Container> {
+
+  private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
+
+  @Override
+  @SuppressWarnings("rawtypes")

Review comment:
       With the change above, you can remove this suppression.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#issuecomment-756951248


   @rdblue: As discussed today, this one would be one of the P1-s
   Could you spare some time to review this please?
   
   Thanks,
   Peter


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559625432



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -19,21 +19,63 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.util.Properties;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
 
-public class HiveIcebergOutputFormat<T> implements OutputFormat<Void, T> {
+public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Container>,
+    HiveOutputFormat<NullWritable, Container> {
+
+  private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass,
+      boolean isCompressed, Properties tableAndSerDeProperties, Progressable progress) {
+
+    TaskAttemptID taskAttemptID = TaskAttemptID.forName(jc.get(TASK_ATTEMPT_ID_KEY));
+    Schema schema = HiveIcebergStorageHandler.schema(jc);
+    PartitionSpec spec = HiveIcebergStorageHandler.spec(jc);
+    FileFormat fileFormat = FileFormat.valueOf(jc.get(InputFormatConfig.WRITE_FILE_FORMAT));
+    long targetFileSize = jc.getLong(InputFormatConfig.WRITE_TARGET_FILE_SIZE, Long.MAX_VALUE);
+    FileIO io = HiveIcebergStorageHandler.io(jc);
+    LocationProvider location = HiveIcebergStorageHandler.location(jc);
+    EncryptionManager encryption = HiveIcebergStorageHandler.encryption(jc);
+    OutputFileFactory outputFileFactory =
+        new OutputFileFactory(spec, FileFormat.PARQUET, location, io, encryption, taskAttemptID.getTaskID().getId(),
+            taskAttemptID.getId(), jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID());
+    HiveIcebergRecordWriter writer = new HiveIcebergRecordWriter(schema, spec, fileFormat,
+        new GenericAppenderFactory(schema), outputFileFactory, io, targetFileSize, taskAttemptID);
+
+    return writer;
+  }
 
   @Override
-  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with Hive is not supported");
+  public org.apache.hadoop.mapred.RecordWriter<NullWritable, Container> getRecordWriter(FileSystem ignored,
+      JobConf job, String name, Progressable progress) {
+    throw new UnsupportedOperationException("Please implement if needed");

Review comment:
       I do not have test for this, but shared the implementation for the two methods.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#issuecomment-761694224


   Thanks @pvary! This looks almost ready to go to me. I think we should be able to get it into the 0.11.0 release.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559047591



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
##########
@@ -206,33 +207,42 @@ public static PartitionSpec spec(Configuration config) {
    * @param config The target configuration to store to
    * @param table The table which we want to store to the configuration
    */
+

Review comment:
       Nit: unnecessary newline? Also separates the method from Javadoc.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#issuecomment-756951248


   @rdblue: As discussed today, this one would be one of the P1-s
   Could you spare some time to review this please?
   
   Thanks,
   Peter


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r553847684



##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandler.java
##########
@@ -1044,6 +1046,161 @@ public void testStructOfStructsInTable() throws IOException {
     }
   }
 
+  @Test
+  public void testInsert() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, ImmutableList.of());
+    // The expected query is like
+    // INSERT INTO customers VALUES (0, 'Alice'), (1, 'Bob'), (2, 'Trudy')
+    StringBuilder query = new StringBuilder().append("INSERT INTO customers VALUES ");
+    CUSTOMER_RECORDS.forEach(record -> query.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    HiveIcebergTestUtils.validateData(table, new ArrayList<>(CUSTOMER_RECORDS), 0);
+  }
+
+  @Test
+  public void testInsertFromSelect() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(CUSTOMER_RECORDS);
+    records.addAll(CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testInsertFromSelectWithOrderBy() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    // We expect that there will be Mappers and Reducers here
+    Table table = testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers ORDER BY customer_id");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(CUSTOMER_RECORDS);
+    records.addAll(CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testWriteMapOfStructsInTable() throws IOException {

Review comment:
       Do you plan to add the other complexType tests as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#issuecomment-762698221


   Thanks @rdblue for all of the reviews and helpful comments!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#issuecomment-762306092


   Thanks @rdblue for the review! Useful as usually!
   
   Fixed the comments.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559050007



##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
##########
@@ -188,7 +193,13 @@ private JobConf jobConf(Table table, int taskNum) {
     conf.setNumReduceTasks(0);
     conf.set(HiveConf.ConfVars.HIVEQUERYID.varname, QUERY_ID);
 
-    HiveIcebergStorageHandler.put(conf, table);
+    Map<String, String> propMap = new HashMap<>();

Review comment:
       Is this needed because a value may be `null`? If not, we prefer to use `Maps.newHashMap` that enforces non-null keys and values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559050110



##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -256,4 +265,293 @@ public void testSelectDistinctFromTable() throws IOException {
       Assert.assertEquals(tableName, size, distinctIds);
     }
   }
+
+  @Test
+  public void testInsert() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, ImmutableList.of());
+
+    // The expected query is like
+    // INSERT INTO customers VALUES (0, 'Alice'), (1, 'Bob'), (2, 'Trudy')
+    StringBuilder query = new StringBuilder().append("INSERT INTO customers VALUES ");
+    HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.forEach(record -> query.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    HiveIcebergTestUtils.validateData(table, new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS), 0);
+  }
+
+  @Test
+  public void testInsertFromSelect() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    records.addAll(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testInsertFromSelectWithOrderBy() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    // We expect that there will be Mappers and Reducers here
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers ORDER BY customer_id");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    records.addAll(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testWriteArrayOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "arrayofprimitives",
+            Types.ListType.ofRequired(3, Types.StringType.get())));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(
+            required(1, "id", Types.LongType.get()),
+            required(2, "arrayofarrays",
+                Types.ListType.ofRequired(3, Types.ListType.ofRequired(4, Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<String>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(required(1, "id", Types.LongType.get()),
+            required(2, "arrayofmaps", Types.ListType
+                .ofRequired(3, Types.MapType.ofRequired(4, 5, Types.StringType.get(),
+                    Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<Map<String, String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(required(1, "id", Types.LongType.get()),
+            required(2, "arrayofstructs", Types.ListType.ofRequired(3, Types.StructType
+                .of(required(4, "something", Types.StringType.get()), required(5, "someone",
+                    Types.StringType.get()), required(6, "somewhere", Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<?>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofprimitives", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.StringType.get())));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, String>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofarrays",
+            Types.MapType.ofRequired(3, 4, Types.StringType.get(), Types.ListType.ofRequired(5,
+                Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, List<String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofmaps", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.MapType.ofRequired(5, 6, Types.StringType.get(), Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, Map<String, String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofstructs", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.StructType.of(required(5, "something", Types.StringType.get()),
+                required(6, "someone", Types.StringType.get()),
+                required(7, "somewhere", Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, GenericRecord>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofprimitives",
+            Types.StructType.of(required(3, "key", Types.StringType.get()), required(4, "value",
+                Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofarrays", Types.StructType
+            .of(required(3, "names", Types.ListType.ofRequired(4, Types.StringType.get())),
+                required(5, "birthdays", Types.ListType.ofRequired(6,
+                    Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofmaps", Types.StructType
+            .of(required(3, "map1", Types.MapType.ofRequired(4, 5,
+                Types.StringType.get(), Types.StringType.get())), required(6, "map2",
+                Types.MapType.ofRequired(7, 8, Types.StringType.get(),
+                    Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofstructs", Types.StructType.of(required(3, "struct1", Types.StructType
+            .of(required(4, "key", Types.StringType.get()), required(5, "value",
+                Types.StringType.get()))))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testPartitionedWrite() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    PartitionSpec spec =
+        PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA).bucket("customer_id", 3).build();

Review comment:
       Nit: Style is generally to split builder statements on builder method calls, so wrapping a `.bucket` and `.build`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559661520



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
##########
@@ -206,33 +207,42 @@ public static PartitionSpec spec(Configuration config) {
    * @param config The target configuration to store to
    * @param table The table which we want to store to the configuration
    */
+
   @VisibleForTesting
-  static void put(Configuration config, Table table) {
-    // The Table contains a FileIO and the FileIO serializes the configuration so we might end up recursively
-    // serializing the objects. To avoid this unset the values for now before serializing.
-    config.unset(InputFormatConfig.SERIALIZED_TABLE);
-    config.unset(InputFormatConfig.FILE_IO);
-    config.unset(InputFormatConfig.LOCATION_PROVIDER);
-    config.unset(InputFormatConfig.ENCRYPTION_MANAGER);
-    config.unset(InputFormatConfig.TABLE_LOCATION);
-    config.unset(InputFormatConfig.TABLE_SCHEMA);
-    config.unset(InputFormatConfig.PARTITION_SPEC);
-
-    String base64Table = table instanceof Serializable ? SerializationUtil.serializeToBase64(table) : null;
-    String base64Io = SerializationUtil.serializeToBase64(table.io());
-    String base64LocationProvider = SerializationUtil.serializeToBase64(table.locationProvider());
-    String base64EncryptionManager = SerializationUtil.serializeToBase64(table.encryption());
-
-    if (base64Table != null) {
-      config.set(InputFormatConfig.SERIALIZED_TABLE, base64Table);
+  static void overlayTableProperties(Configuration configuration, TableDesc tableDesc, Map<String, String> map) {
+    Properties props = tableDesc.getProperties();
+    Table table = Catalogs.loadTable(configuration, props);
+    String schemaJson = SchemaParser.toJson(table.schema());
+
+    Map<String, String> original = new HashMap<>(map);
+    map.clear();
+
+    map.putAll(Maps.fromProperties(props));
+    map.putAll(original);
+
+    map.put(InputFormatConfig.TABLE_IDENTIFIER, props.getProperty(Catalogs.NAME));
+    map.put(InputFormatConfig.TABLE_LOCATION, table.location());
+    map.put(InputFormatConfig.TABLE_SCHEMA, schemaJson);
+    map.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(table.spec()));
+    String formatString = PropertyUtil.propertyAsString(table.properties(), DEFAULT_FILE_FORMAT,
+        DEFAULT_FILE_FORMAT_DEFAULT);
+    map.put(InputFormatConfig.WRITE_FILE_FORMAT, formatString.toUpperCase(Locale.ENGLISH));
+    map.put(InputFormatConfig.WRITE_TARGET_FILE_SIZE,
+        table.properties().getOrDefault(WRITE_TARGET_FILE_SIZE_BYTES,
+            String.valueOf(WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT)));
+
+    if (table instanceof Serializable) {
+      map.put(InputFormatConfig.SERIALIZED_TABLE, SerializationUtil.serializeToBase64(table));
     }
 
-    config.set(InputFormatConfig.FILE_IO, base64Io);
-    config.set(InputFormatConfig.LOCATION_PROVIDER, base64LocationProvider);
-    config.set(InputFormatConfig.ENCRYPTION_MANAGER, base64EncryptionManager);
+    map.put(InputFormatConfig.FILE_IO, SerializationUtil.serializeToBase64(table.io()));
+    map.put(InputFormatConfig.LOCATION_PROVIDER, SerializationUtil.serializeToBase64(table.locationProvider()));
+    map.put(InputFormatConfig.ENCRYPTION_MANAGER, SerializationUtil.serializeToBase64(table.encryption()));
+    // We need to remove this otherwise the job.xml will be invalid
+    map.remove("columns.comments");

Review comment:
       Added more comments to explain what is happening




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559048014



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
##########
@@ -206,33 +207,42 @@ public static PartitionSpec spec(Configuration config) {
    * @param config The target configuration to store to
    * @param table The table which we want to store to the configuration
    */
+
   @VisibleForTesting
-  static void put(Configuration config, Table table) {
-    // The Table contains a FileIO and the FileIO serializes the configuration so we might end up recursively
-    // serializing the objects. To avoid this unset the values for now before serializing.
-    config.unset(InputFormatConfig.SERIALIZED_TABLE);
-    config.unset(InputFormatConfig.FILE_IO);
-    config.unset(InputFormatConfig.LOCATION_PROVIDER);
-    config.unset(InputFormatConfig.ENCRYPTION_MANAGER);
-    config.unset(InputFormatConfig.TABLE_LOCATION);
-    config.unset(InputFormatConfig.TABLE_SCHEMA);
-    config.unset(InputFormatConfig.PARTITION_SPEC);
-
-    String base64Table = table instanceof Serializable ? SerializationUtil.serializeToBase64(table) : null;
-    String base64Io = SerializationUtil.serializeToBase64(table.io());
-    String base64LocationProvider = SerializationUtil.serializeToBase64(table.locationProvider());
-    String base64EncryptionManager = SerializationUtil.serializeToBase64(table.encryption());
-
-    if (base64Table != null) {
-      config.set(InputFormatConfig.SERIALIZED_TABLE, base64Table);
+  static void overlayTableProperties(Configuration configuration, TableDesc tableDesc, Map<String, String> map) {
+    Properties props = tableDesc.getProperties();
+    Table table = Catalogs.loadTable(configuration, props);
+    String schemaJson = SchemaParser.toJson(table.schema());
+
+    Map<String, String> original = new HashMap<>(map);
+    map.clear();
+
+    map.putAll(Maps.fromProperties(props));
+    map.putAll(original);

Review comment:
       I would prefer not to use replacement for the order here because it requires copying the map, adding properties, and then adding the map back, which is convoluted. Technically, you don't even need `map.clear()` because the properties would overwrite and then the original would overwrite.
   
   I think it would be better to filter properties instead:
   
   ```java
   Maps.fromProperties(props).entrySet().stream()
       .filter(entry -> !map.containsKey(entry.getKey())) // map overrides tableDesc properties
       .forEach(entry -> map.put(entry.getKey(), entry.getValue()));
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] lcspinter commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
lcspinter commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r553847684



##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandler.java
##########
@@ -1044,6 +1046,161 @@ public void testStructOfStructsInTable() throws IOException {
     }
   }
 
+  @Test
+  public void testInsert() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, ImmutableList.of());
+    // The expected query is like
+    // INSERT INTO customers VALUES (0, 'Alice'), (1, 'Bob'), (2, 'Trudy')
+    StringBuilder query = new StringBuilder().append("INSERT INTO customers VALUES ");
+    CUSTOMER_RECORDS.forEach(record -> query.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    HiveIcebergTestUtils.validateData(table, new ArrayList<>(CUSTOMER_RECORDS), 0);
+  }
+
+  @Test
+  public void testInsertFromSelect() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(CUSTOMER_RECORDS);
+    records.addAll(CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testInsertFromSelectWithOrderBy() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    // We expect that there will be Mappers and Reducers here
+    Table table = testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers ORDER BY customer_id");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(CUSTOMER_RECORDS);
+    records.addAll(CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testWriteMapOfStructsInTable() throws IOException {

Review comment:
       Do you plan to add the other complexType tests as well?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559047030



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -19,21 +19,63 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.util.Properties;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
 
-public class HiveIcebergOutputFormat<T> implements OutputFormat<Void, T> {
+public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Container>,
+    HiveOutputFormat<NullWritable, Container> {

Review comment:
       `Container` is parameterized and we avoid suppressing `rawtypes` because it is dangerous to erase the types at compile time.
   
   It looks like the `HiveIcebergRecordWriter` always casts to `Container<Record>` so I think this should drop the parameter `T` and use `Container<Record>` for the value type:
   
   ```java
   public class HiveIcebergOutputFormat implements OutputFormat<NullWritable, Container<Record>>,
       HiveOutputFormat<NullWritable, Container<Record>> {
     ...
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559049366



##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -256,4 +265,293 @@ public void testSelectDistinctFromTable() throws IOException {
       Assert.assertEquals(tableName, size, distinctIds);
     }
   }
+
+  @Test
+  public void testInsert() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, ImmutableList.of());
+
+    // The expected query is like
+    // INSERT INTO customers VALUES (0, 'Alice'), (1, 'Bob'), (2, 'Trudy')
+    StringBuilder query = new StringBuilder().append("INSERT INTO customers VALUES ");
+    HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.forEach(record -> query.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    HiveIcebergTestUtils.validateData(table, new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS), 0);
+  }
+
+  @Test
+  public void testInsertFromSelect() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    records.addAll(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testInsertFromSelectWithOrderBy() throws IOException {

Review comment:
       Is this test case needed? Seems like it duplicates the one above but with a different write order. Does the order of records matter?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559047177



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -19,21 +19,63 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.util.Properties;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
 
-public class HiveIcebergOutputFormat<T> implements OutputFormat<Void, T> {
+public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, Container>,
+    HiveOutputFormat<NullWritable, Container> {
+
+  private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, Class valueClass,
+      boolean isCompressed, Properties tableAndSerDeProperties, Progressable progress) {
+
+    TaskAttemptID taskAttemptID = TaskAttemptID.forName(jc.get(TASK_ATTEMPT_ID_KEY));
+    Schema schema = HiveIcebergStorageHandler.schema(jc);
+    PartitionSpec spec = HiveIcebergStorageHandler.spec(jc);
+    FileFormat fileFormat = FileFormat.valueOf(jc.get(InputFormatConfig.WRITE_FILE_FORMAT));
+    long targetFileSize = jc.getLong(InputFormatConfig.WRITE_TARGET_FILE_SIZE, Long.MAX_VALUE);
+    FileIO io = HiveIcebergStorageHandler.io(jc);
+    LocationProvider location = HiveIcebergStorageHandler.location(jc);
+    EncryptionManager encryption = HiveIcebergStorageHandler.encryption(jc);
+    OutputFileFactory outputFileFactory =
+        new OutputFileFactory(spec, FileFormat.PARQUET, location, io, encryption, taskAttemptID.getTaskID().getId(),
+            taskAttemptID.getId(), jc.get(HiveConf.ConfVars.HIVEQUERYID.varname) + "-" + taskAttemptID.getJobID());
+    HiveIcebergRecordWriter writer = new HiveIcebergRecordWriter(schema, spec, fileFormat,
+        new GenericAppenderFactory(schema), outputFileFactory, io, targetFileSize, taskAttemptID);
+
+    return writer;
+  }
 
   @Override
-  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with Hive is not supported");
+  public org.apache.hadoop.mapred.RecordWriter<NullWritable, Container> getRecordWriter(FileSystem ignored,
+      JobConf job, String name, Progressable progress) {
+    throw new UnsupportedOperationException("Please implement if needed");

Review comment:
       Is there a reason not to implement this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559049457



##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -256,4 +265,293 @@ public void testSelectDistinctFromTable() throws IOException {
       Assert.assertEquals(tableName, size, distinctIds);
     }
   }
+
+  @Test
+  public void testInsert() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, ImmutableList.of());
+
+    // The expected query is like
+    // INSERT INTO customers VALUES (0, 'Alice'), (1, 'Bob'), (2, 'Trudy')
+    StringBuilder query = new StringBuilder().append("INSERT INTO customers VALUES ");
+    HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.forEach(record -> query.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    HiveIcebergTestUtils.validateData(table, new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS), 0);
+  }
+
+  @Test
+  public void testInsertFromSelect() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    records.addAll(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testInsertFromSelectWithOrderBy() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    // We expect that there will be Mappers and Reducers here
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers ORDER BY customer_id");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    records.addAll(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testWriteArrayOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "arrayofprimitives",
+            Types.ListType.ofRequired(3, Types.StringType.get())));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(
+            required(1, "id", Types.LongType.get()),
+            required(2, "arrayofarrays",
+                Types.ListType.ofRequired(3, Types.ListType.ofRequired(4, Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<String>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(required(1, "id", Types.LongType.get()),
+            required(2, "arrayofmaps", Types.ListType
+                .ofRequired(3, Types.MapType.ofRequired(4, 5, Types.StringType.get(),
+                    Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<Map<String, String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(required(1, "id", Types.LongType.get()),
+            required(2, "arrayofstructs", Types.ListType.ofRequired(3, Types.StructType
+                .of(required(4, "something", Types.StringType.get()), required(5, "someone",
+                    Types.StringType.get()), required(6, "somewhere", Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<?>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofprimitives", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.StringType.get())));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, String>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofarrays",
+            Types.MapType.ofRequired(3, 4, Types.StringType.get(), Types.ListType.ofRequired(5,
+                Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, List<String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofmaps", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.MapType.ofRequired(5, 6, Types.StringType.get(), Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, Map<String, String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofstructs", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.StructType.of(required(5, "something", Types.StringType.get()),
+                required(6, "someone", Types.StringType.get()),
+                required(7, "somewhere", Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, GenericRecord>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofprimitives",
+            Types.StructType.of(required(3, "key", Types.StringType.get()), required(4, "value",
+                Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofarrays", Types.StructType
+            .of(required(3, "names", Types.ListType.ofRequired(4, Types.StringType.get())),
+                required(5, "birthdays", Types.ListType.ofRequired(6,
+                    Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofmaps", Types.StructType
+            .of(required(3, "map1", Types.MapType.ofRequired(4, 5,
+                Types.StringType.get(), Types.StringType.get())), required(6, "map2",
+                Types.MapType.ofRequired(7, 8, Types.StringType.get(),
+                    Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofstructs", Types.StructType.of(required(3, "struct1", Types.StructType
+            .of(required(4, "key", Types.StringType.get()), required(5, "value",
+                Types.StringType.get()))))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testPartitionedWrite() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    PartitionSpec spec =
+        PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA).bucket("customer_id", 3).build();
+
+    TableIdentifier identifier = TableIdentifier.of("default", "partitioned_customers");
+
+    shell.executeStatement("CREATE EXTERNAL TABLE " + identifier +
+        " STORED BY '" + HiveIcebergStorageHandler.class.getName() + "' " +
+        testTables.locationForCreateTableSQL(identifier) +
+        "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
+        SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
+        "'" + InputFormatConfig.PARTITION_SPEC + "'='" +
+        PartitionSpecParser.toJson(spec) + "', " +
+        "'" + InputFormatConfig.WRITE_FILE_FORMAT + "'='" + fileFormat + "')");
+
+    List<Record> records = TestHelper.generateRandomRecords(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, 4, 0L);
+
+    StringBuilder query = new StringBuilder().append("INSERT INTO " + identifier + " VALUES ");
+    records.forEach(record -> query.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    Table table = testTables.loadTable(identifier);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  private void testComplexTypeWrite(Schema schema, List<Record> records) throws IOException {
+    String tableName = "complex_table";
+    Table table = testTables.createTable(shell, "complex_table", schema,
+        fileFormat, ImmutableList.of());

Review comment:
       Nit: no need for this newline.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559049940



##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -256,4 +265,293 @@ public void testSelectDistinctFromTable() throws IOException {
       Assert.assertEquals(tableName, size, distinctIds);
     }
   }
+
+  @Test
+  public void testInsert() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, ImmutableList.of());
+
+    // The expected query is like
+    // INSERT INTO customers VALUES (0, 'Alice'), (1, 'Bob'), (2, 'Trudy')
+    StringBuilder query = new StringBuilder().append("INSERT INTO customers VALUES ");
+    HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.forEach(record -> query.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    HiveIcebergTestUtils.validateData(table, new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS), 0);
+  }
+
+  @Test
+  public void testInsertFromSelect() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    records.addAll(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testInsertFromSelectWithOrderBy() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    // We expect that there will be Mappers and Reducers here
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers ORDER BY customer_id");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    records.addAll(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testWriteArrayOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "arrayofprimitives",
+            Types.ListType.ofRequired(3, Types.StringType.get())));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(
+            required(1, "id", Types.LongType.get()),
+            required(2, "arrayofarrays",
+                Types.ListType.ofRequired(3, Types.ListType.ofRequired(4, Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<String>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(required(1, "id", Types.LongType.get()),
+            required(2, "arrayofmaps", Types.ListType
+                .ofRequired(3, Types.MapType.ofRequired(4, 5, Types.StringType.get(),
+                    Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<Map<String, String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(required(1, "id", Types.LongType.get()),
+            required(2, "arrayofstructs", Types.ListType.ofRequired(3, Types.StructType
+                .of(required(4, "something", Types.StringType.get()), required(5, "someone",
+                    Types.StringType.get()), required(6, "somewhere", Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<?>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofprimitives", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.StringType.get())));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, String>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofarrays",
+            Types.MapType.ofRequired(3, 4, Types.StringType.get(), Types.ListType.ofRequired(5,
+                Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, List<String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofmaps", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.MapType.ofRequired(5, 6, Types.StringType.get(), Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, Map<String, String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofstructs", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.StructType.of(required(5, "something", Types.StringType.get()),
+                required(6, "someone", Types.StringType.get()),
+                required(7, "somewhere", Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, GenericRecord>) r.get(1)).isEmpty()).collect(Collectors.toList());

Review comment:
       Why are these filters needed in the complex type cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#issuecomment-756167747


   The final part of the INSERT INTO for Hive tables.
   @marton-bod, @lcspinter, @rdblue: Could you please take a look?
   
   Thanks,
   Peter


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559739650



##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -256,4 +265,293 @@ public void testSelectDistinctFromTable() throws IOException {
       Assert.assertEquals(tableName, size, distinctIds);
     }
   }
+
+  @Test
+  public void testInsert() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, ImmutableList.of());
+
+    // The expected query is like
+    // INSERT INTO customers VALUES (0, 'Alice'), (1, 'Bob'), (2, 'Trudy')
+    StringBuilder query = new StringBuilder().append("INSERT INTO customers VALUES ");
+    HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.forEach(record -> query.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    HiveIcebergTestUtils.validateData(table, new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS), 0);
+  }
+
+  @Test
+  public void testInsertFromSelect() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    records.addAll(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testInsertFromSelectWithOrderBy() throws IOException {

Review comment:
       Makes sense! Thanks for the additional context.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559627561



##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -256,4 +265,293 @@ public void testSelectDistinctFromTable() throws IOException {
       Assert.assertEquals(tableName, size, distinctIds);
     }
   }
+
+  @Test
+  public void testInsert() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, ImmutableList.of());
+
+    // The expected query is like
+    // INSERT INTO customers VALUES (0, 'Alice'), (1, 'Bob'), (2, 'Trudy')
+    StringBuilder query = new StringBuilder().append("INSERT INTO customers VALUES ");
+    HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.forEach(record -> query.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    HiveIcebergTestUtils.validateData(table, new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS), 0);
+  }
+
+  @Test
+  public void testInsertFromSelect() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    records.addAll(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testInsertFromSelectWithOrderBy() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    // We expect that there will be Mappers and Reducers here
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers ORDER BY customer_id");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    records.addAll(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testWriteArrayOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "arrayofprimitives",
+            Types.ListType.ofRequired(3, Types.StringType.get())));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(
+            required(1, "id", Types.LongType.get()),
+            required(2, "arrayofarrays",
+                Types.ListType.ofRequired(3, Types.ListType.ofRequired(4, Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<String>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(required(1, "id", Types.LongType.get()),
+            required(2, "arrayofmaps", Types.ListType
+                .ofRequired(3, Types.MapType.ofRequired(4, 5, Types.StringType.get(),
+                    Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<Map<String, String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(required(1, "id", Types.LongType.get()),
+            required(2, "arrayofstructs", Types.ListType.ofRequired(3, Types.StructType
+                .of(required(4, "something", Types.StringType.get()), required(5, "someone",
+                    Types.StringType.get()), required(6, "somewhere", Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<?>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofprimitives", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.StringType.get())));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, String>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofarrays",
+            Types.MapType.ofRequired(3, 4, Types.StringType.get(), Types.ListType.ofRequired(5,
+                Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, List<String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofmaps", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.MapType.ofRequired(5, 6, Types.StringType.get(), Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, Map<String, String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofstructs", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.StructType.of(required(5, "something", Types.StringType.get()),
+                required(6, "someone", Types.StringType.get()),
+                required(7, "somewhere", Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, GenericRecord>) r.get(1)).isEmpty()).collect(Collectors.toList());

Review comment:
       The method used for testing (creating the insert with `array()`) does not work for empty arrays, since Hive does not support that. So we need to filter out the records with empty arrays. Reworded the code - see if it is easier to read or not.
   
   Thanks,
   Peter

##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
##########
@@ -188,7 +193,13 @@ private JobConf jobConf(Table table, int taskNum) {
     conf.setNumReduceTasks(0);
     conf.set(HiveConf.ConfVars.HIVEQUERYID.varname, QUERY_ID);
 
-    HiveIcebergStorageHandler.put(conf, table);
+    Map<String, String> propMap = new HashMap<>();

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r553994138



##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandler.java
##########
@@ -1044,6 +1046,161 @@ public void testStructOfStructsInTable() throws IOException {
     }
   }
 
+  @Test
+  public void testInsert() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, ImmutableList.of());
+    // The expected query is like
+    // INSERT INTO customers VALUES (0, 'Alice'), (1, 'Bob'), (2, 'Trudy')
+    StringBuilder query = new StringBuilder().append("INSERT INTO customers VALUES ");
+    CUSTOMER_RECORDS.forEach(record -> query.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    HiveIcebergTestUtils.validateData(table, new ArrayList<>(CUSTOMER_RECORDS), 0);
+  }
+
+  @Test
+  public void testInsertFromSelect() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(CUSTOMER_RECORDS);
+    records.addAll(CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testInsertFromSelectWithOrderBy() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    // We expect that there will be Mappers and Reducers here
+    Table table = testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers ORDER BY customer_id");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(CUSTOMER_RECORDS);
+    records.addAll(CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testWriteMapOfStructsInTable() throws IOException {

Review comment:
       Added more tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559625879



##########
File path: mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
##########
@@ -206,33 +207,42 @@ public static PartitionSpec spec(Configuration config) {
    * @param config The target configuration to store to
    * @param table The table which we want to store to the configuration
    */
+

Review comment:
       Fixed the javadoc to (I suspect I have left the extra line when I started to fix the doc and got distracted :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r553994138



##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandler.java
##########
@@ -1044,6 +1046,161 @@ public void testStructOfStructsInTable() throws IOException {
     }
   }
 
+  @Test
+  public void testInsert() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, ImmutableList.of());
+    // The expected query is like
+    // INSERT INTO customers VALUES (0, 'Alice'), (1, 'Bob'), (2, 'Trudy')
+    StringBuilder query = new StringBuilder().append("INSERT INTO customers VALUES ");
+    CUSTOMER_RECORDS.forEach(record -> query.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    HiveIcebergTestUtils.validateData(table, new ArrayList<>(CUSTOMER_RECORDS), 0);
+  }
+
+  @Test
+  public void testInsertFromSelect() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(CUSTOMER_RECORDS);
+    records.addAll(CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testInsertFromSelectWithOrderBy() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    // We expect that there will be Mappers and Reducers here
+    Table table = testTables.createTable(shell, "customers", CUSTOMER_SCHEMA, fileFormat, CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers ORDER BY customer_id");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(CUSTOMER_RECORDS);
+    records.addAll(CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testWriteMapOfStructsInTable() throws IOException {

Review comment:
       Added more tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#issuecomment-762411138


   Thanks for the updates, @pvary! I merged this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] pvary commented on a change in pull request #2038: Hive: Implementation for INSERT INTO Iceberg backed Hive tables using the new HiveIcebergRecordWriter

Posted by GitBox <gi...@apache.org>.
pvary commented on a change in pull request #2038:
URL: https://github.com/apache/iceberg/pull/2038#discussion_r559627761



##########
File path: mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
##########
@@ -256,4 +265,293 @@ public void testSelectDistinctFromTable() throws IOException {
       Assert.assertEquals(tableName, size, distinctIds);
     }
   }
+
+  @Test
+  public void testInsert() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, ImmutableList.of());
+
+    // The expected query is like
+    // INSERT INTO customers VALUES (0, 'Alice'), (1, 'Bob'), (2, 'Trudy')
+    StringBuilder query = new StringBuilder().append("INSERT INTO customers VALUES ");
+    HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.forEach(record -> query.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    query.setLength(query.length() - 1);
+
+    shell.executeStatement(query.toString());
+
+    HiveIcebergTestUtils.validateData(table, new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS), 0);
+  }
+
+  @Test
+  public void testInsertFromSelect() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    records.addAll(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testInsertFromSelectWithOrderBy() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    // We expect that there will be Mappers and Reducers here
+    Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    shell.executeStatement("INSERT INTO customers SELECT * FROM customers ORDER BY customer_id");
+
+    // Check that everything is duplicated as expected
+    List<Record> records = new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    records.addAll(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    HiveIcebergTestUtils.validateData(table, records, 0);
+  }
+
+  @Test
+  public void testWriteArrayOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "arrayofprimitives",
+            Types.ListType.ofRequired(3, Types.StringType.get())));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(
+            required(1, "id", Types.LongType.get()),
+            required(2, "arrayofarrays",
+                Types.ListType.ofRequired(3, Types.ListType.ofRequired(4, Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<String>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(required(1, "id", Types.LongType.get()),
+            required(2, "arrayofmaps", Types.ListType
+                .ofRequired(3, Types.MapType.ofRequired(4, 5, Types.StringType.get(),
+                    Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<Map<String, String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteArrayOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema =
+        new Schema(required(1, "id", Types.LongType.get()),
+            required(2, "arrayofstructs", Types.ListType.ofRequired(3, Types.StructType
+                .of(required(4, "something", Types.StringType.get()), required(5, "someone",
+                    Types.StringType.get()), required(6, "somewhere", Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((List<?>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofprimitives", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.StringType.get())));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, String>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofarrays",
+            Types.MapType.ofRequired(3, 4, Types.StringType.get(), Types.ListType.ofRequired(5,
+                Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, List<String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofmaps", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.MapType.ofRequired(5, 6, Types.StringType.get(), Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, Map<String, String>>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteMapOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "mapofstructs", Types.MapType.ofRequired(3, 4, Types.StringType.get(),
+            Types.StructType.of(required(5, "something", Types.StringType.get()),
+                required(6, "someone", Types.StringType.get()),
+                required(7, "somewhere", Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L).stream()
+        .filter(r -> !((Map<String, GenericRecord>) r.get(1)).isEmpty()).collect(Collectors.toList());
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfPrimitivesInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofprimitives",
+            Types.StructType.of(required(3, "key", Types.StringType.get()), required(4, "value",
+                Types.StringType.get()))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfArraysInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofarrays", Types.StructType
+            .of(required(3, "names", Types.ListType.ofRequired(4, Types.StringType.get())),
+                required(5, "birthdays", Types.ListType.ofRequired(6,
+                    Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfMapsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofmaps", Types.StructType
+            .of(required(3, "map1", Types.MapType.ofRequired(4, 5,
+                Types.StringType.get(), Types.StringType.get())), required(6, "map2",
+                Types.MapType.ofRequired(7, 8, Types.StringType.get(),
+                    Types.StringType.get())))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testWriteStructOfStructsInTable() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()),
+        required(2, "structofstructs", Types.StructType.of(required(3, "struct1", Types.StructType
+            .of(required(4, "key", Types.StringType.get()), required(5, "value",
+                Types.StringType.get()))))));
+    List<Record> records = TestHelper.generateRandomRecords(schema, 5, 0L);
+    testComplexTypeWrite(schema, records);
+  }
+
+  @Test
+  public void testPartitionedWrite() throws IOException {
+    Assume.assumeTrue("Tez write is not implemented yet", executionEngine.equals("mr"));
+
+    PartitionSpec spec =
+        PartitionSpec.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA).bucket("customer_id", 3).build();

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org