You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/10/07 17:56:51 UTC

[incubator-hudi] branch master updated: [HUDI-232] Implement sealing/unsealing for HoodieRecord class (#938)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d050d98  [HUDI-232] Implement sealing/unsealing for HoodieRecord class (#938)
d050d98 is described below

commit d050d980715c0d8649531a3ed5ca7a7472b29760
Author: leesf <49...@qq.com>
AuthorDate: Tue Oct 8 01:56:46 2019 +0800

    [HUDI-232] Implement sealing/unsealing for HoodieRecord class (#938)
---
 .../org/apache/hudi/index/InMemoryHashIndex.java   |  2 +
 .../apache/hudi/index/bloom/HoodieBloomIndex.java  |  2 +
 .../org/apache/hudi/index/hbase/HBaseIndex.java    |  2 +
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  2 +
 .../org/apache/hudi/io/HoodieCreateHandle.java     |  2 +
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  2 +
 .../apache/hudi/func/TestUpdateMapFunction.java    |  2 +
 .../apache/hudi/table/TestCopyOnWriteTable.java    |  4 ++
 .../org/apache/hudi/common/model/HoodieRecord.java | 23 +++++++
 .../apache/hudi/common/model/TestHoodieRecord.java | 74 ++++++++++++++++++++++
 .../hudi/common/util/SpillableMapTestUtils.java    |  2 +
 11 files changed, 117 insertions(+)

diff --git a/hudi-client/src/main/java/org/apache/hudi/index/InMemoryHashIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/InMemoryHashIndex.java
index 0ef81cf..fdf2cbf 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/InMemoryHashIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/InMemoryHashIndex.java
@@ -132,7 +132,9 @@ public class InMemoryHashIndex<T extends HoodieRecordPayload> extends HoodieInde
       while (hoodieRecordIterator.hasNext()) {
         HoodieRecord<T> rec = hoodieRecordIterator.next();
         if (recordLocationMap.containsKey(rec.getKey())) {
+          rec.unseal();
           rec.setCurrentLocation(recordLocationMap.get(rec.getKey()));
+          rec.seal();
         }
         taggedRecords.add(rec);
       }
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index 133f1e4..75016c6 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -371,7 +371,9 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> extends HoodieIndex
       // currentLocation 2 times and it will fail the second time. So creating a new in memory
       // copy of the hoodie record.
       record = new HoodieRecord<>(inputRecord);
+      record.unseal();
       record.setCurrentLocation(location.get());
+      record.seal();
     }
     return record;
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
index 8399c4f..111d231 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/hbase/HBaseIndex.java
@@ -239,7 +239,9 @@ public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
                       currentRecord = new HoodieRecord(
                           new HoodieKey(currentRecord.getRecordKey(), partitionPath),
                           currentRecord.getData());
+                      currentRecord.unseal();
                       currentRecord.setCurrentLocation(new HoodieRecordLocation(commitTs, fileId));
+                      currentRecord.seal();
                       taggedRecords.add(currentRecord);
                       // the key from Result and the key being processed should be same
                       assert (currentRecord.getRecordKey().contentEquals(keyFromResult));
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 34178e6..0aa4137 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -293,7 +293,9 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieWri
 
   private void writeToBuffer(HoodieRecord<T> record) {
     // update the new location of the record, so we know where to find it next
+    record.unseal();
     record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
+    record.seal();
     Option<IndexedRecord> indexedRecord = getIndexedRecord(record);
     if (indexedRecord.isPresent()) {
       recordList.add(indexedRecord.get());
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 4cb935f..afb8f85 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -101,7 +101,9 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieWri
         IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
         storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
         // update the new location of record, so we know where to find it next
+        record.unseal();
         record.setNewLocation(new HoodieRecordLocation(instantTime, writeStatus.getFileId()));
+        record.seal();
         recordsWritten++;
         insertRecordsWritten++;
       } else {
diff --git a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index e1926d0..a819cf7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ b/hudi-client/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -208,7 +208,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieWrit
       HoodieRecord<T> record = newRecordsItr.next();
       partitionPath = record.getPartitionPath();
       // update the new location of the record, so we know where to find it next
+      record.unseal();
       record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
+      record.seal();
       //NOTE: Once Records are added to map (spillable-map), DO NOT change it as they won't persist
       keyToNewRecords.put(record.getRecordKey(), record);
     }
diff --git a/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java b/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java
index 5837628..8bd29b1 100644
--- a/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java
+++ b/hudi-client/src/test/java/org/apache/hudi/func/TestUpdateMapFunction.java
@@ -113,7 +113,9 @@ public class TestUpdateMapFunction extends HoodieClientTestHarness {
       TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
       HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()),
           rowChange1);
+      record1.unseal();
       record1.setCurrentLocation(new HoodieRecordLocation("100", fileId));
+      record1.seal();
       updateRecords.add(record1);
 
       try {
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
index 6c0c0ca..12ef633 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestCopyOnWriteTable.java
@@ -192,7 +192,9 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     TestRawTripPayload updateRowChanges1 = new TestRawTripPayload(updateRecordStr1);
     HoodieRecord updatedRecord1 = new HoodieRecord(
         new HoodieKey(updateRowChanges1.getRowKey(), updateRowChanges1.getPartitionPath()), updateRowChanges1);
+    updatedRecord1.unseal();
     updatedRecord1.setCurrentLocation(new HoodieRecordLocation(null, FSUtils.getFileId(parquetFile.getName())));
+    updatedRecord1.seal();
 
     TestRawTripPayload rowChange4 = new TestRawTripPayload(recordStr4);
     HoodieRecord insertedRecord1 = new HoodieRecord(
@@ -407,7 +409,9 @@ public class TestCopyOnWriteTable extends HoodieClientTestHarness {
     List<HoodieRecord> insertRecords = dataGenerator.generateInserts("001", numInserts);
     List<HoodieRecord> updateRecords = dataGenerator.generateUpdates("001", numUpdates);
     for (HoodieRecord updateRec : updateRecords) {
+      updateRec.unseal();
       updateRec.setCurrentLocation(new HoodieRecordLocation("001", "file1"));
+      updateRec.seal();
     }
     List<HoodieRecord> records = new ArrayList<>();
     records.addAll(insertRecords);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index c03f175..a5043de 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -63,17 +63,24 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
    */
   private HoodieRecordLocation newLocation;
 
+  /**
+   * Indicates whether the object is sealed.
+   */
+  private boolean sealed;
+
   public HoodieRecord(HoodieKey key, T data) {
     this.key = key;
     this.data = data;
     this.currentLocation = null;
     this.newLocation = null;
+    this.sealed = false;
   }
 
   public HoodieRecord(HoodieRecord<T> record) {
     this(record.key, record.data);
     this.currentLocation = record.currentLocation;
     this.newLocation = record.newLocation;
+    this.sealed = record.sealed;
   }
 
   public HoodieKey getKey() {
@@ -100,6 +107,7 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
    * Sets the current currentLocation of the record. This should happen exactly-once
    */
   public HoodieRecord setCurrentLocation(HoodieRecordLocation location) {
+    checkState();
     assert currentLocation == null;
     this.currentLocation = location;
     return this;
@@ -114,6 +122,7 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
    * exactly-once.
    */
   public HoodieRecord setNewLocation(HoodieRecordLocation location) {
+    checkState();
     assert newLocation == null;
     this.newLocation = location;
     return this;
@@ -170,4 +179,18 @@ public class HoodieRecord<T extends HoodieRecordPayload> implements Serializable
     assert key != null;
     return key.getRecordKey();
   }
+
+  public void seal() {
+    this.sealed = true;
+  }
+
+  public void unseal() {
+    this.sealed = false;
+  }
+
+  public void checkState() {
+    if (sealed) {
+      throw new UnsupportedOperationException("Not allowed to modify after sealed");
+    }
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java
new file mode 100644
index 0000000..408fedc
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieRecord.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+import static org.junit.Assert.fail;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.SchemaTestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests for {@link HoodieRecord}.
+ */
+public class TestHoodieRecord {
+
+  private HoodieRecord hoodieRecord;
+
+  @Before
+  public void setUp() throws Exception {
+    final List<IndexedRecord> indexedRecords = SchemaTestUtil.generateHoodieTestRecords(0, 1);
+    final List<HoodieRecord> hoodieRecords = indexedRecords.stream()
+              .map(r -> new HoodieRecord(new HoodieKey(UUID.randomUUID().toString(), "0000/00/00"),
+                      new AvroBinaryTestPayload(Option.of((GenericRecord) r)))).collect(Collectors.toList());
+    hoodieRecord = hoodieRecords.get(0);
+  }
+
+  @Test
+  public void testModificationAfterSeal() {
+    hoodieRecord.seal();
+    final HoodieRecordLocation location = new HoodieRecordLocation("100", "0");
+    try {
+      hoodieRecord.setCurrentLocation(location);
+      fail("should fail since modification after sealed is not allowed");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof UnsupportedOperationException);
+    }
+  }
+
+  @Test
+  public void testNormalModification() {
+    hoodieRecord.unseal();
+    final HoodieRecordLocation location = new HoodieRecordLocation("100", "0");
+    hoodieRecord.setCurrentLocation(location);
+    hoodieRecord.seal();
+
+    hoodieRecord.unseal();
+    hoodieRecord.setNewLocation(location);
+    hoodieRecord.seal();
+  }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/SpillableMapTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/SpillableMapTestUtils.java
index 62a36e8..c0b2d8f 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/SpillableMapTestUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/SpillableMapTestUtils.java
@@ -45,7 +45,9 @@ public class SpillableMapTestUtils {
           recordKeys.add(key);
           HoodieRecord record = new HoodieRecord<>(new HoodieKey(key, partitionPath),
               new HoodieAvroPayload(Option.of((GenericRecord) r)));
+          record.unseal();
           record.setCurrentLocation(new HoodieRecordLocation("DUMMY_COMMIT_TIME", "DUMMY_FILE_ID"));
+          record.seal();
           records.put(key, record);
         });
     return recordKeys;