You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/09/20 02:21:18 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #3413: [HUDI-2277] HoodieDeltaStreamer reading ORC files directly using ORCDFSSource

nsivabalan commented on a change in pull request #3413:
URL: https://github.com/apache/hudi/pull/3413#discussion_r711845020



##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
##########
@@ -1398,6 +1399,34 @@ private void testParquetDFSSource(boolean useSchemaProvider, List<String> transf
     testNum++;
   }
 
+  private void testORCDFSSource(boolean useSchemaProvider, List<String> transformerClassNames) throws Exception {
+    // prepare ORCDFSSource
+    TypedProperties orcProps = new TypedProperties();
+
+    // Properties used for testing delta-streamer with orc source
+    orcProps.setProperty("include", "base.properties");
+    orcProps.setProperty("hoodie.embed.timeline.server","false");
+    orcProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
+    orcProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there");
+    if (useSchemaProvider) {
+      orcProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/" + "source.avsc");
+      if (transformerClassNames != null) {
+        orcProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/" + "target.avsc");
+      }
+    }
+    orcProps.setProperty("hoodie.deltastreamer.source.dfs.root", ORC_SOURCE_ROOT);
+    UtilitiesTestBase.Helpers.savePropsToDFS(orcProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_ORC);
+
+    String tableBasePath = dfsBasePath + "/test_orc_source_table" + testNum;
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+            TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, ORCDFSSource.class.getName(),
+                    transformerClassNames, PROPS_FILENAME_TEST_ORC, false,
+                    useSchemaProvider, 100000, false, null, null, "timestamp", null), jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(ORC_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext);

Review comment:
       shouldn't this be *.orc instead of *.parquet

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
##########
@@ -364,5 +385,32 @@ public static String toJsonString(HoodieRecord hr) {
     public static String[] jsonifyRecords(List<HoodieRecord> records) {
       return records.stream().map(Helpers::toJsonString).toArray(String[]::new);
     }
+
+    public static void addAvroRecord(
+            VectorizedRowBatch batch,
+            GenericRecord record,
+            TypeDescription orcSchema,
+            int orcBatchSize,
+            Writer writer
+    ) throws IOException {
+      for (int c = 0; c < batch.numCols; c++) {
+        ColumnVector colVector = batch.cols[c];
+        final String thisField = orcSchema.getFieldNames().get(c);
+        final TypeDescription type = orcSchema.getChildren().get(c);
+
+        Object fieldValue = record.get(thisField);
+        Schema.Field avroField = record.getSchema().getField(thisField);
+        AvroOrcUtils.addToVector(type, colVector, avroField.schema(), fieldValue, batch.size);
+      }
+
+      batch.size++;
+
+      if (batch.size % orcBatchSize == 0 || batch.size == batch.getMaxSize()) {

Review comment:
       can you help me understand what this code block is doing? I see this method is called for one batch of records. lets say there are 100 records in one batch. if I am not wrong, batch.size at the end of adding 100 records should be 100. and so batch.size % orcBatchSize will be equal to 0 only after adding all records. 
   If thats the case, shouldn't we move this block outside of this method. 
   or am I missing something. 

##########
File path: hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
##########
@@ -364,5 +385,32 @@ public static String toJsonString(HoodieRecord hr) {
     public static String[] jsonifyRecords(List<HoodieRecord> records) {
       return records.stream().map(Helpers::toJsonString).toArray(String[]::new);
     }
+
+    public static void addAvroRecord(

Review comment:
       does this need to be public? can we switch to private. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org