You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/01/05 18:19:31 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #2400: [WIP] Some fixes to test suite framework. Adding clustering node

nsivabalan commented on a change in pull request #2400:
URL: https://github.com/apache/hudi/pull/2400#discussion_r552072132



##########
File path: docker/demo/config/test-suite/complex-dag-cow.yaml
##########
@@ -14,41 +14,47 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 dag_name: cow-long-running-example.yaml
-dag_rounds: 2
+dag_rounds: 1

Review comment:
       ignore reviewing these two files for now:
   complex-dag-cow.yaml
   and complex-dag-mor.yaml
   I am yet to fix these. 

##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
##########
@@ -111,17 +111,19 @@ public void execute(ExecutionContext context) throws Exception {
       throw new AssertionError("Hudi contents does not match contents input data. ");
     }
 
-    String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY());
-    String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY());
-    log.warn("Validating hive table with db : " + database + " and table : " + tableName);
-    Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." + tableName);
-    Dataset<Row> trimmedCowDf = cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
-        .drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
-    intersectionDf = inputSnapshotDf.intersect(trimmedDf);
-    // the intersected df should be same as inputDf. if not, there is some mismatch.
-    if (inputSnapshotDf.except(intersectionDf).count() != 0) {
-      log.error("Data set validation failed for COW hive table. Total count in hudi " + trimmedCowDf.count() + ", input df count " + inputSnapshotDf.count());
-      throw new AssertionError("Hudi hive table contents does not match contents input data. ");
+    if (config.isValidateHive()) {

Review comment:
       no changes except adding this if condition

##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java
##########
@@ -41,21 +46,17 @@
   private GenericRecord lastRecord;
   // Partition path field name
   private Set<String> partitionPathFieldNames;
-  private String firstPartitionPathField;
 
   public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, String schema) {
     this(maxEntriesToProduce, GenericRecordFullPayloadGenerator.DEFAULT_PAYLOAD_SIZE, schema, null, 0);
   }
 
   public FlexibleSchemaRecordGenerationIterator(long maxEntriesToProduce, int minPayloadSize, String schemaStr,
-      List<String> partitionPathFieldNames, int numPartitions) {
+      List<String> partitionPathFieldNames, int partitionIndex) {

Review comment:
       all changes in this file are part of reverting e33a8f733c4a9a94479c166ad13ae9d53142cd3f

##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
##########
@@ -127,9 +127,13 @@ public DeltaGenerator(DFSDeltaConfig deltaOutputConfig, JavaSparkContext jsc, Sp
     return ws;
   }
 
+  public int getBatchId() {
+    return this.batchId;
+  }
+
   public JavaRDD<GenericRecord> generateInserts(Config operation) {
     int numPartitions = operation.getNumInsertPartitions();
-    long recordsPerPartition = operation.getNumRecordsInsert();
+    long recordsPerPartition = operation.getNumRecordsInsert() / numPartitions;

Review comment:
       these are part of reverting e33a8f733c4a9a94479c166ad13ae9d53142cd3f

##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java
##########
@@ -140,7 +144,7 @@ public DeltaGenerator(DFSDeltaConfig deltaOutputConfig, JavaSparkContext jsc, Sp
     JavaRDD<GenericRecord> inputBatch = jsc.parallelize(partitionIndexes, numPartitions)
         .mapPartitionsWithIndex((index, p) -> {
           return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
-              minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions));
+            minPayloadSize, schemaStr, partitionPathFieldNames, (Integer)index));

Review comment:
       these are part of reverting e33a8f733c4a9a94479c166ad13ae9d53142cd3f

##########
File path: hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/GenericRecordFullPayloadGenerator.java
##########
@@ -46,9 +46,9 @@
  */
 public class GenericRecordFullPayloadGenerator implements Serializable {
 
-  private static final Logger LOG = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);
+  private static Logger LOG = LoggerFactory.getLogger(GenericRecordFullPayloadGenerator.class);

Review comment:
       all changes in this file are part of reverting e33a8f733c4a9a94479c166ad13ae9d53142cd3f

##########
File path: hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/generator/TestGenericRecordPayloadGenerator.java
##########
@@ -134,38 +133,4 @@ public void testComplexPayloadWithLargeMinSize() throws Exception {
     assertTrue(HoodieAvroUtils.avroToBytes(record).length < minPayloadSize + 0.1 * minPayloadSize);
   }
 
-  @Test

Review comment:
       these are part of reverting e33a8f733c4a9a94479c166ad13ae9d53142cd3f

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -307,7 +307,10 @@ public void refreshTimeline() throws IOException {
           if (!commitMetadata.getMetadata(CHECKPOINT_KEY).isEmpty()) {
             resumeCheckpointStr = Option.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
           }
-        }  else if (HoodieTimeline.compareTimestamps(HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
+        } else if (commitMetadata.getOperationType() == WriteOperationType.CLUSTER) {
+          // incase of CLUSTER commit, no checkpoint will be available in metadata.

Review comment:
       @n3nash @satishkotha : after clustering, when we call readFromSource in deltaStreamer, execution was going into lines 314 to 316. hence have added this condition to return empty checkpoint. Can you confirm this looks ok. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org