You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by na...@apache.org on 2020/04/09 17:15:45 UTC

[incubator-hudi] branch master updated: [HUDI-568] Improve unit test coverage

This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f5f34bb  [HUDI-568] Improve unit test coverage
f5f34bb is described below

commit f5f34bb1c16e6d070668486eba2a29f554c0bbc7
Author: Ramachandran Madtas Subramaniam <mr...@uber.com>
AuthorDate: Wed Mar 11 12:58:31 2020 -0700

    [HUDI-568] Improve unit test coverage
    
    Classes improved:
    * HoodieTableMetaClient
    * RocksDBDAO
    * HoodieRealtimeFileSplit
---
 .../hudi/common/util/collection/RocksDBDAO.java    |   3 -
 .../common/table/TestHoodieTableMetaClient.java    |  19 +++
 .../common/util/collection/TestRocksDBManager.java | 168 ++++++++++++++++++---
 hudi-hadoop-mr/pom.xml                             |   6 +
 .../realtime/TestHoodieRealtimeFileSplit.java      | 162 ++++++++++++++++++++
 5 files changed, 335 insertions(+), 23 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java
index 84b4953..3c08460 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java
@@ -75,9 +75,6 @@ public class RocksDBDAO {
    * Create RocksDB if not initialized.
    */
   private RocksDB getRocksDB() {
-    if (null == rocksDB) {
-      init();
-    }
     return rocksDB;
   }
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
index ae376b4..e1279d1 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -94,4 +95,22 @@ public class TestHoodieTableMetaClient extends HoodieCommonTestHarness {
     assertArrayEquals("Commit value should be \"test-detail\"", "test-detail".getBytes(),
         activeCommitTimeline.getInstantDetails(completedInstant).get());
   }
+
+  @Test
+  public void testEquals() throws IOException {
+    HoodieTableMetaClient metaClient1 = HoodieTestUtils.init(folder.getRoot().getAbsolutePath(), getTableType());
+    HoodieTableMetaClient metaClient2 = HoodieTestUtils.init(folder.getRoot().getAbsolutePath(), getTableType());
+    assertEquals(metaClient1, metaClient1);
+    assertEquals(metaClient1, metaClient2);
+    assertNotEquals(metaClient1, null);
+    assertNotEquals(metaClient1, new Object());
+  }
+
+  @Test
+  public void testToString() throws IOException {
+    HoodieTableMetaClient metaClient1 = HoodieTestUtils.init(folder.getRoot().getAbsolutePath(), getTableType());
+    HoodieTableMetaClient metaClient2 = HoodieTestUtils.init(folder.getRoot().getAbsolutePath(), getTableType());
+    assertEquals(metaClient1.toString(), metaClient2.toString());
+    assertNotEquals(metaClient1.toString(), new Object().toString());
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBManager.java b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBManager.java
index f024a0b..5f30fa1 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBManager.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestRocksDBManager.java
@@ -20,15 +20,17 @@ package org.apache.hudi.common.util.collection;
 
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 
-import org.junit.AfterClass;
+import org.junit.After;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -41,16 +43,16 @@ import java.util.stream.IntStream;
  */
 public class TestRocksDBManager {
 
-  private static RocksDBDAO dbManager;
+  private RocksDBDAO dbManager;
 
-  @BeforeClass
-  public static void setUpClass() {
-    dbManager = new RocksDBDAO("/dummy/path",
+  @Before
+  public void setUpClass() {
+    dbManager = new RocksDBDAO("/dummy/path/" + UUID.randomUUID().toString(),
         FileSystemViewStorageConfig.newBuilder().build().newBuilder().build().getRocksdbBasePath());
   }
 
-  @AfterClass
-  public static void tearDownClass() {
+  @After
+  public void tearDownClass() {
     if (dbManager != null) {
       dbManager.close();
       dbManager = null;
@@ -68,16 +70,19 @@ public class TestRocksDBManager {
     String family2 = "family2";
     List<String> colFamilies = Arrays.asList(family1, family2);
 
-    List<Payload> payloads = IntStream.range(0, 100).mapToObj(index -> {
+    final List<Payload<String>> payloads = new ArrayList<>();
+    IntStream.range(0, 100).forEach(index -> {
       String prefix = prefixes.get(index % 4);
       String key = prefix + UUID.randomUUID().toString();
       String family = colFamilies.get(index % 2);
       String val = "VALUE_" + UUID.randomUUID().toString();
-      return new Payload(prefix, key, val, family);
-    }).collect(Collectors.toList());
+      payloads.add(new Payload(prefix, key, val, family));
+    });
 
     colFamilies.forEach(family -> dbManager.dropColumnFamily(family));
     colFamilies.forEach(family -> dbManager.addColumnFamily(family));
+    colFamilies.forEach(family -> dbManager.dropColumnFamily(family));
+    colFamilies.forEach(family -> dbManager.addColumnFamily(family));
 
     Map<String, Map<String, Integer>> countsMap = new HashMap<>();
     payloads.forEach(payload -> {
@@ -103,21 +108,114 @@ public class TestRocksDBManager {
             expCount == null ? 0L : expCount.longValue(), gotPayloads.size());
         gotPayloads.forEach(p -> {
           Assert.assertEquals(p.getRight().getFamily(), family);
-          Assert.assertTrue(p.getRight().getKey().startsWith(prefix));
+          Assert.assertTrue(p.getRight().getKey().toString().startsWith(prefix));
         });
       });
     });
 
-    payloads.forEach(payload -> {
+    payloads.stream().filter(p -> !p.getPrefix().equalsIgnoreCase(prefix1)).forEach(payload -> {
       Payload p = dbManager.get(payload.getFamily(), payload.getKey());
       Assert.assertEquals("Retrieved correct payload for key :" + payload.getKey(), payload, p);
 
-      // Now, delete the key
       dbManager.delete(payload.getFamily(), payload.getKey());
 
-      // Now retrieve
       Payload p2 = dbManager.get(payload.getFamily(), payload.getKey());
-      Assert.assertNull("Retrieved correct payload for key :" + p.getKey(), p2);
+      Assert.assertNull("Retrieved correct payload for key :" + payload.getKey(), p2);
+    });
+
+    colFamilies.forEach(family -> {
+      dbManager.prefixDelete(family, prefix1);
+
+      int got = dbManager.prefixSearch(family, prefix1).collect(Collectors.toList()).size();
+      Assert.assertEquals("Expected prefix delete to leave at least one item for family: " + family, countsMap.get(family).get(prefix1) == null ? 0 : 1, got);
+    });
+
+    payloads.stream().filter(p -> !p.getPrefix().equalsIgnoreCase(prefix1)).forEach(payload -> {
+      Payload p2 = dbManager.get(payload.getFamily(), payload.getKey());
+      Assert.assertNull("Retrieved correct payload for key :" + payload.getKey(), p2);
+    });
+
+    // Now do a prefix search
+    colFamilies.forEach(family -> {
+      prefixes.stream().filter(p -> !p.equalsIgnoreCase(prefix1)).forEach(prefix -> {
+        List<Pair<String, Payload>> gotPayloads =
+            dbManager.<Payload>prefixSearch(family, prefix).collect(Collectors.toList());
+        Assert.assertEquals("Size check for prefix (" + prefix + ") and family (" + family + ")", 0,
+            gotPayloads.size());
+      });
+    });
+
+    String rocksDBBasePath = dbManager.getRocksDBBasePath();
+    dbManager.close();
+    Assert.assertFalse(new File(rocksDBBasePath).exists());
+  }
+
+  @Test
+  public void testWithSerializableKey() {
+    String prefix1 = "prefix1_";
+    String prefix2 = "prefix2_";
+    String prefix3 = "prefix3_";
+    String prefix4 = "prefix4_";
+    List<String> prefixes = Arrays.asList(prefix1, prefix2, prefix3, prefix4);
+    String family1 = "family1";
+    String family2 = "family2";
+    List<String> colFamilies = Arrays.asList(family1, family2);
+
+    final List<Payload<PayloadKey>> payloads = new ArrayList<>();
+    IntStream.range(0, 100).forEach(index -> {
+      String prefix = prefixes.get(index % 4);
+      String key = prefix + UUID.randomUUID().toString();
+      String family = colFamilies.get(index % 2);
+      String val = "VALUE_" + UUID.randomUUID().toString();
+      payloads.add(new Payload(prefix, new PayloadKey((key)), val, family));
+    });
+
+    colFamilies.forEach(family -> dbManager.dropColumnFamily(family));
+    colFamilies.forEach(family -> dbManager.addColumnFamily(family));
+
+    Map<String, Map<String, Integer>> countsMap = new HashMap<>();
+    dbManager.writeBatch(batch -> {
+      payloads.forEach(payload -> {
+        dbManager.putInBatch(batch, payload.getFamily(), payload.getKey(), payload);
+
+        if (!countsMap.containsKey(payload.family)) {
+          countsMap.put(payload.family, new HashMap<>());
+        }
+        Map<String, Integer> c = countsMap.get(payload.family);
+        if (!c.containsKey(payload.prefix)) {
+          c.put(payload.prefix, 0);
+        }
+        int currCount = c.get(payload.prefix);
+        c.put(payload.prefix, currCount + 1);
+      });
+    });
+
+    Iterator<List<Payload<PayloadKey>>> payloadSplits = payloads.stream()
+        .collect(Collectors.partitioningBy(s -> payloads.indexOf(s) > payloads.size() / 2)).values()
+        .iterator();
+
+    payloads.forEach(payload -> {
+      Payload p = dbManager.get(payload.getFamily(), payload.getKey());
+      Assert.assertEquals("Retrieved correct payload for key :" + payload.getKey(), payload, p);
+    });
+
+    payloadSplits.next().forEach(payload -> {
+      dbManager.delete(payload.getFamily(), payload.getKey());
+      Payload want = dbManager.get(payload.getFamily(), payload.getKey());
+      Assert.assertNull("Verify deleted during single delete for key :" + payload.getKey(), want);
+    });
+
+    dbManager.writeBatch(batch -> {
+      payloadSplits.next().forEach(payload -> {
+        dbManager.deleteInBatch(batch, payload.getFamily(), payload.getKey());
+        Payload want = dbManager.get(payload.getFamily(), payload.getKey());
+        Assert.assertEquals("Verify not deleted during batch delete in progress for key :" + payload.getKey(), payload, want);
+      });
+    });
+
+    payloads.forEach(payload -> {
+      Payload want = dbManager.get(payload.getFamily(), payload.getKey());
+      Assert.assertNull("Verify delete for key :" + payload.getKey(), want);
     });
 
     // Now do a prefix search
@@ -135,17 +233,47 @@ public class TestRocksDBManager {
     Assert.assertFalse(new File(rocksDBBasePath).exists());
   }
 
+  public static class PayloadKey implements Serializable {
+    private String key;
+
+    public PayloadKey(String key) {
+      this.key = key;
+    }
+
+    @Override
+    public String toString() {
+      return key;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      PayloadKey that = (PayloadKey) o;
+      return Objects.equals(key, that.key);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(key);
+    }
+  }
+
   /**
    * A payload definition for {@link TestRocksDBManager}.
    */
-  public static class Payload implements Serializable {
+  public static class Payload<T> implements Serializable {
 
     private final String prefix;
-    private final String key;
+    private final T key;
     private final String val;
     private final String family;
 
-    public Payload(String prefix, String key, String val, String family) {
+    public Payload(String prefix, T key, String val, String family) {
       this.prefix = prefix;
       this.key = key;
       this.val = val;
@@ -156,7 +284,7 @@ public class TestRocksDBManager {
       return prefix;
     }
 
-    public String getKey() {
+    public T getKey() {
       return key;
     }
 
diff --git a/hudi-hadoop-mr/pom.xml b/hudi-hadoop-mr/pom.xml
index 0b35256..578b9a0 100644
--- a/hudi-hadoop-mr/pom.xml
+++ b/hudi-hadoop-mr/pom.xml
@@ -101,6 +101,12 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java
new file mode 100644
index 0000000..cab1e66
--- /dev/null
+++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.InOrder;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.AdditionalMatchers.aryEq;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyByte;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
+
+public class TestHoodieRealtimeFileSplit {
+
+  private HoodieRealtimeFileSplit split;
+  private String basePath;
+  private List<String> deltaLogPaths;
+  private String fileSplitName;
+  private FileSplit baseFileSplit;
+  private String maxCommitTime;
+  private TemporaryFolder tmp;
+
+  @Before
+  public void setUp() throws Exception {
+    tmp = new TemporaryFolder();
+    tmp.create();
+
+    basePath = tmp.getRoot().toString();
+    deltaLogPaths = Collections.singletonList(basePath + "/1.log");
+    fileSplitName = basePath + "/test.file";
+    baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new String[]{});
+    maxCommitTime = "10001";
+
+    split = new HoodieRealtimeFileSplit(baseFileSplit, basePath, deltaLogPaths, maxCommitTime);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    tmp.delete();
+  }
+
+  @Test
+  public void testWrite() throws IOException {
+    // create a mock for DataOutput that will be used in the write method
+    // this way we can capture and verify if correct arguments were passed
+    DataOutput out = mock(DataOutput.class);
+
+    // register expected method calls for void functions
+    // so that we can verify what was called after the method call finishes
+    doNothing().when(out).writeByte(anyByte());
+    doNothing().when(out).writeInt(anyInt());
+    doNothing().when(out).write(any(byte[].class), anyInt(), anyInt());
+    doNothing().when(out).write(any(byte[].class));
+
+    // call the method we want to test with the mocked input
+    split.write(out);
+
+    // verify the method calls on the mocked object in the order of the calls
+    InOrder inorder = inOrder(out);
+    inorder.verify(out, times(1)).writeByte(eq(fileSplitName.length()));
+    inorder.verify(out, times(1)).write(aryEq(Text.encode(fileSplitName).array()), eq(0), eq(fileSplitName.length()));
+    inorder.verify(out, times(1)).writeInt(eq(basePath.length()));
+    inorder.verify(out, times(1)).write(aryEq(basePath.getBytes(StandardCharsets.UTF_8)));
+    inorder.verify(out, times(1)).writeInt(eq(maxCommitTime.length()));
+    inorder.verify(out, times(1)).write(aryEq(maxCommitTime.getBytes(StandardCharsets.UTF_8)));
+    inorder.verify(out, times(1)).writeInt(eq(deltaLogPaths.size()));
+    inorder.verify(out, times(1)).writeInt(eq(deltaLogPaths.get(0).length()));
+    inorder.verify(out, times(1)).write(aryEq(deltaLogPaths.get(0).getBytes(StandardCharsets.UTF_8)));
+    // verify there are no more interactions happened on the mocked object
+    inorder.verifyNoMoreInteractions();
+  }
+
+  @Test
+  public void testReadFields() throws IOException {
+    // create a mock for DataOutput that will be used in the readFields method
+    // this way we can capture and verify if correct arguments were passed
+    DataInput in = mock(DataInput.class);
+
+    // register the mock responses to be returned when particular method call happens
+    // on the mocked object
+    when(in.readByte()).thenReturn((byte) fileSplitName.length());
+    // Answer implementation is used to guarantee the response in sequence of the mock method calls
+    // since the same method is called many times, we need to return the responses in proper sequence
+    when(in.readInt()).thenAnswer(new Answer<Integer>() {
+      private int count = 0;
+      private int[] answers = new int[]{basePath.length(), maxCommitTime.length(), deltaLogPaths.size(), deltaLogPaths.get(0).length()};
+
+      @Override
+      public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
+        return answers[count++];
+      }
+    });
+    Answer<Void> readFullyAnswer = new Answer<Void>() {
+      private int count = 0;
+      private byte[][] answers = new byte[][]{
+          fileSplitName.getBytes(StandardCharsets.UTF_8),
+          basePath.getBytes(StandardCharsets.UTF_8),
+          maxCommitTime.getBytes(StandardCharsets.UTF_8),
+          deltaLogPaths.get(0).getBytes(StandardCharsets.UTF_8),
+      };
+
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        byte[] bytes = invocation.getArgumentAt(0, byte[].class);
+        byte[] answer = answers[count++];
+        System.arraycopy(answer, 0, bytes, 0, answer.length);
+        return null;
+      }
+    };
+    doAnswer(readFullyAnswer).when(in).readFully(any());
+    doAnswer(readFullyAnswer).when(in).readFully(any(), anyInt(), anyInt());
+
+    // call readFields with mocked object
+    HoodieRealtimeFileSplit read = new HoodieRealtimeFileSplit();
+    read.readFields(in);
+
+    // assert proper returns after reading from the mocked object
+    assertEquals(basePath, read.getBasePath());
+    assertEquals(maxCommitTime, read.getMaxCommitTime());
+    assertEquals(deltaLogPaths, read.getDeltaLogPaths());
+    assertEquals(split.toString(), read.toString());
+  }
+}
\ No newline at end of file