You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/06/28 02:26:57 UTC

[GitHub] [hudi] nsivabalan commented on a change in pull request #3117: [HUDI-2028] Implement RockDbBasedMap as an alternate to DiskBasedMap in ExternalSpillableMap

nsivabalan commented on a change in pull request #3117:
URL: https://github.com/apache/hudi/pull/3117#discussion_r659419442



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/SpillableRocksDBBasedMap.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+/**
+ * This class provides a disk spillable only map implementation that is based on RocksDB.
+ */
+public final class SpillableRocksDBBasedMap<T extends Serializable, R extends Serializable> implements SpillableDiskMap<T, R> {
+  // ColumnFamily allows partitioning data within RockDB, which allows
+  // independent configuration and faster deletes across partitions
+  // https://github.com/facebook/rocksdb/wiki/Column-Families
+  // For this use case, we use a single static column family/ partition
+  //
+  private static final String COLUMN_FAMILY_NAME = "spill_map";
+
+  private static final Logger LOG = LogManager.getLogger(SpillableRocksDBBasedMap.class);
+  // Stores the key and corresponding value's latest metadata spilled to disk
+  private final Set<T> keySet;
+  private final String rocksDbStoragePath;
+  private RocksDBDAO rocksDb;
+
+  public SpillableRocksDBBasedMap(String rocksDbStoragePath) throws IOException {
+    this.keySet = new HashSet<>();
+    this.rocksDbStoragePath = rocksDbStoragePath;
+  }
+
+  @Override
+  public int size() {
+    return keySet.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return keySet.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return keySet.contains((T) key);
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+    throw new HoodieNotSupportedException("unable to compare values in map");
+  }
+
+  @Override
+  public R get(Object key) {
+    if (!containsKey(key)) {
+      return null;
+    }
+    return getRocksDb().get(COLUMN_FAMILY_NAME, (T) key);
+  }
+
+  @Override
+  public R put(T key, R value) {
+    getRocksDb().put(COLUMN_FAMILY_NAME, key, value);
+    keySet.add(key);
+    return value;
+  }
+
+  @Override
+  public R remove(Object key) {
+    R value = get(key);
+    if (value != null) {
+      keySet.remove((T) key);
+      getRocksDb().delete(COLUMN_FAMILY_NAME, (T) key);
+    }
+    return value;
+  }
+
+  @Override
+  public void putAll(Map<? extends T, ? extends R> keyValues) {
+    getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value)));
+    keySet.addAll(keyValues.keySet());
+  }
+
+  @Override
+  public void clear() {
+    close();
+  }
+
+  @Override
+  public @NotNull Set<T> keySet() {
+    return keySet;
+  }
+
+  @Override
+  public @NotNull Collection<R> values() {
+    throw new HoodieException("Unsupported Operation Exception");
+  }
+
+  @Override
+  public @NotNull Set<Entry<T, R>> entrySet() {
+    Set<Entry<T, R>> entrySet = new HashSet<>();
+    for (T key : keySet) {
+      entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key)));
+    }
+    return entrySet;
+  }
+
+  /**
+   * Custom iterator to iterate over values written to disk.
+   */
+  @Override
+  public @NotNull Iterator<R> iterator() {
+    return getRocksDb().iterator(COLUMN_FAMILY_NAME);
+  }
+
+  @Override
+  public Stream<R> valueStream() {
+    return keySet.stream().sorted().sequential().map(valueMetaData -> (R) get(valueMetaData));
+  }
+
+  @Override
+  public long sizeOfFileOnDiskInBytes() {
+    return getRocksDb().getTotalBytesWritten();
+  }
+
+  @Override
+  public void close() {
+    keySet.clear();
+    if (null != rocksDb) {
+      rocksDb.close();
+    }
+    rocksDb = null;
+  }
+
+  private RocksDBDAO getRocksDb() {
+    if (null == rocksDb) {

Review comment:
       do we need to think about thread safety here? 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/SpillableRocksDBBasedMap.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+/**
+ * This class provides a disk spillable only map implementation that is based on RocksDB.
+ */
+public final class SpillableRocksDBBasedMap<T extends Serializable, R extends Serializable> implements SpillableDiskMap<T, R> {
+  // ColumnFamily allows partitioning data within RockDB, which allows
+  // independent configuration and faster deletes across partitions
+  // https://github.com/facebook/rocksdb/wiki/Column-Families
+  // For this use case, we use a single static column family/ partition
+  //
+  private static final String COLUMN_FAMILY_NAME = "spill_map";
+
+  private static final Logger LOG = LogManager.getLogger(SpillableRocksDBBasedMap.class);
+  // Stores the key and corresponding value's latest metadata spilled to disk
+  private final Set<T> keySet;
+  private final String rocksDbStoragePath;
+  private RocksDBDAO rocksDb;
+
+  public SpillableRocksDBBasedMap(String rocksDbStoragePath) throws IOException {
+    this.keySet = new HashSet<>();
+    this.rocksDbStoragePath = rocksDbStoragePath;
+  }
+
+  @Override
+  public int size() {
+    return keySet.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return keySet.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return keySet.contains((T) key);
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+    throw new HoodieNotSupportedException("unable to compare values in map");
+  }
+
+  @Override
+  public R get(Object key) {
+    if (!containsKey(key)) {
+      return null;
+    }
+    return getRocksDb().get(COLUMN_FAMILY_NAME, (T) key);
+  }
+
+  @Override
+  public R put(T key, R value) {
+    getRocksDb().put(COLUMN_FAMILY_NAME, key, value);
+    keySet.add(key);
+    return value;
+  }
+
+  @Override
+  public R remove(Object key) {
+    R value = get(key);
+    if (value != null) {
+      keySet.remove((T) key);
+      getRocksDb().delete(COLUMN_FAMILY_NAME, (T) key);
+    }
+    return value;
+  }
+
+  @Override
+  public void putAll(Map<? extends T, ? extends R> keyValues) {
+    getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value)));
+    keySet.addAll(keyValues.keySet());
+  }
+
+  @Override
+  public void clear() {
+    close();
+  }
+
+  @Override
+  public @NotNull Set<T> keySet() {
+    return keySet;
+  }
+
+  @Override
+  public @NotNull Collection<R> values() {
+    throw new HoodieException("Unsupported Operation Exception");
+  }
+
+  @Override
+  public @NotNull Set<Entry<T, R>> entrySet() {
+    Set<Entry<T, R>> entrySet = new HashSet<>();
+    for (T key : keySet) {
+      entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key)));
+    }
+    return entrySet;
+  }
+
+  /**
+   * Custom iterator to iterate over values written to disk.
+   */
+  @Override
+  public @NotNull Iterator<R> iterator() {
+    return getRocksDb().iterator(COLUMN_FAMILY_NAME);
+  }
+
+  @Override
+  public Stream<R> valueStream() {
+    return keySet.stream().sorted().sequential().map(valueMetaData -> (R) get(valueMetaData));
+  }
+
+  @Override
+  public long sizeOfFileOnDiskInBytes() {
+    return getRocksDb().getTotalBytesWritten();
+  }
+
+  @Override
+  public void close() {
+    keySet.clear();
+    if (null != rocksDb) {
+      rocksDb.close();
+    }
+    rocksDb = null;

Review comment:
       minor. keySet = null

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/SpillableDiskMap.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.stream.Stream;
+
+/**
+ * The spillable map that provides the map interface for storing records in disk/ db after they
+ * spill over from memory. Used by {@link ExternalSpillableMap}.
+ * @param <T>

Review comment:
       docs please. can you add what does T and R refer to.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/SpillableRocksDBBasedMap.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+/**
+ * This class provides a disk spillable only map implementation that is based on RocksDB.
+ */
+public final class SpillableRocksDBBasedMap<T extends Serializable, R extends Serializable> implements SpillableDiskMap<T, R> {
+  // ColumnFamily allows partitioning data within RockDB, which allows
+  // independent configuration and faster deletes across partitions
+  // https://github.com/facebook/rocksdb/wiki/Column-Families
+  // For this use case, we use a single static column family/ partition
+  //
+  private static final String COLUMN_FAMILY_NAME = "spill_map";
+
+  private static final Logger LOG = LogManager.getLogger(SpillableRocksDBBasedMap.class);
+  // Stores the key and corresponding value's latest metadata spilled to disk
+  private final Set<T> keySet;
+  private final String rocksDbStoragePath;
+  private RocksDBDAO rocksDb;
+
+  public SpillableRocksDBBasedMap(String rocksDbStoragePath) throws IOException {
+    this.keySet = new HashSet<>();
+    this.rocksDbStoragePath = rocksDbStoragePath;
+  }
+
+  @Override
+  public int size() {
+    return keySet.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return keySet.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return keySet.contains((T) key);
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+    throw new HoodieNotSupportedException("unable to compare values in map");
+  }
+
+  @Override
+  public R get(Object key) {
+    if (!containsKey(key)) {
+      return null;
+    }
+    return getRocksDb().get(COLUMN_FAMILY_NAME, (T) key);
+  }
+
+  @Override
+  public R put(T key, R value) {
+    getRocksDb().put(COLUMN_FAMILY_NAME, key, value);
+    keySet.add(key);
+    return value;
+  }
+
+  @Override
+  public R remove(Object key) {
+    R value = get(key);
+    if (value != null) {
+      keySet.remove((T) key);
+      getRocksDb().delete(COLUMN_FAMILY_NAME, (T) key);
+    }
+    return value;
+  }
+
+  @Override
+  public void putAll(Map<? extends T, ? extends R> keyValues) {
+    getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value)));
+    keySet.addAll(keyValues.keySet());
+  }
+
+  @Override
+  public void clear() {
+    close();
+  }
+
+  @Override
+  public @NotNull Set<T> keySet() {
+    return keySet;
+  }
+
+  @Override
+  public @NotNull Collection<R> values() {
+    throw new HoodieException("Unsupported Operation Exception");
+  }
+
+  @Override
+  public @NotNull Set<Entry<T, R>> entrySet() {
+    Set<Entry<T, R>> entrySet = new HashSet<>();
+    for (T key : keySet) {
+      entrySet.add(new AbstractMap.SimpleEntry<>(key, get(key)));
+    }
+    return entrySet;
+  }
+
+  /**
+   * Custom iterator to iterate over values written to disk.
+   */
+  @Override
+  public @NotNull Iterator<R> iterator() {
+    return getRocksDb().iterator(COLUMN_FAMILY_NAME);
+  }
+
+  @Override
+  public Stream<R> valueStream() {
+    return keySet.stream().sorted().sequential().map(valueMetaData -> (R) get(valueMetaData));

Review comment:
       am not very conversant w/ this spillable map in general. 
   May I know is there a necessity to sort here? I don't see external spillable map in master does any sort for valueStream. also, for putAll() I don't see any validation as such for sorted entries. 

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestSpillableRocksDBBasedMap.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.testutils.SpillableMapTestUtils;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test the rocksDb based Map {@link SpillableRocksDBBasedMap}
+ * that is used by {@link ExternalSpillableMap}.
+ */
+public class TestSpillableRocksDBBasedMap extends HoodieCommonTestHarness {
+
+  @BeforeEach
+  public void setUp() {
+    initPath();
+  }
+
+  @Test
+  public void testSimpleInsertSequential() throws IOException, URISyntaxException {
+    SpillableRocksDBBasedMap rocksDBBasedMap = new SpillableRocksDBBasedMap(basePath);
+    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    ((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+    List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, rocksDBBasedMap);
+
+    // Ensure the number of records is correct
+    assertEquals(rocksDBBasedMap.size(), recordKeys.size());
+    // make sure records have spilled to disk
+    assertTrue(rocksDBBasedMap.sizeOfFileOnDiskInBytes() > 0);
+    Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = rocksDBBasedMap.iterator();
+    List<HoodieRecord> oRecords = new ArrayList<>();
+    while (itr.hasNext()) {
+      HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
+      oRecords.add(rec);
+      assert recordKeys.contains(rec.getRecordKey());
+    }
+    assertEquals(recordKeys.size(), oRecords.size());
+  }
+
+  @Test
+  public void testSimpleInsertRandomAccess() throws IOException, URISyntaxException {
+    SpillableRocksDBBasedMap rocksDBBasedMap = new SpillableRocksDBBasedMap(basePath);
+    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    ((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+    List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, rocksDBBasedMap);
+
+    // make sure records have spilled to disk
+    assertTrue(rocksDBBasedMap.sizeOfFileOnDiskInBytes() > 0);
+
+    Random random = new Random();
+    for (int i = 0; i < recordKeys.size(); i++) {
+      String key = recordKeys.get(random.nextInt(recordKeys.size()));
+      assert rocksDBBasedMap.get(key) != null;

Review comment:
       same here.

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestSpillableRocksDBBasedMap.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.testutils.SpillableMapTestUtils;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test the rocksDb based Map {@link SpillableRocksDBBasedMap}
+ * that is used by {@link ExternalSpillableMap}.
+ */
+public class TestSpillableRocksDBBasedMap extends HoodieCommonTestHarness {
+
+  @BeforeEach
+  public void setUp() {
+    initPath();
+  }
+
+  @Test
+  public void testSimpleInsertSequential() throws IOException, URISyntaxException {
+    SpillableRocksDBBasedMap rocksDBBasedMap = new SpillableRocksDBBasedMap(basePath);
+    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    ((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+    List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, rocksDBBasedMap);
+
+    // Ensure the number of records is correct
+    assertEquals(rocksDBBasedMap.size(), recordKeys.size());
+    // make sure records have spilled to disk
+    assertTrue(rocksDBBasedMap.sizeOfFileOnDiskInBytes() > 0);
+    Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = rocksDBBasedMap.iterator();
+    List<HoodieRecord> oRecords = new ArrayList<>();
+    while (itr.hasNext()) {
+      HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
+      oRecords.add(rec);
+      assert recordKeys.contains(rec.getRecordKey());
+    }
+    assertEquals(recordKeys.size(), oRecords.size());
+  }
+
+  @Test
+  public void testSimpleInsertRandomAccess() throws IOException, URISyntaxException {
+    SpillableRocksDBBasedMap rocksDBBasedMap = new SpillableRocksDBBasedMap(basePath);
+    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    ((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+    List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, rocksDBBasedMap);
+
+    // make sure records have spilled to disk
+    assertTrue(rocksDBBasedMap.sizeOfFileOnDiskInBytes() > 0);
+
+    Random random = new Random();
+    for (int i = 0; i < recordKeys.size(); i++) {
+      String key = recordKeys.get(random.nextInt(recordKeys.size()));
+      assert rocksDBBasedMap.get(key) != null;
+    }
+  }
+
+  @Test
+  public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISyntaxException {
+    SpillableRocksDBBasedMap rocksDBBasedMap = new SpillableRocksDBBasedMap<>(basePath);

Review comment:
       Do you see lot of commonality between this test and testSimpleInsertSequential? If yes, can you create a private method and re-use code if possible. Making tests more modularized and scalable is always a good practice. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
##########
@@ -259,6 +284,40 @@ public void close() {
     return entrySet;
   }
 
+  public enum DiskMapType {
+    DISK_MAP("disk_map"),

Review comment:
       java docs please.

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/SpillableRocksDBBasedMap.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieNotSupportedException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
+
+/**
+ * This class provides a disk spillable only map implementation that is based on RocksDB.
+ */
+public final class SpillableRocksDBBasedMap<T extends Serializable, R extends Serializable> implements SpillableDiskMap<T, R> {
+  // ColumnFamily allows partitioning data within RockDB, which allows
+  // independent configuration and faster deletes across partitions
+  // https://github.com/facebook/rocksdb/wiki/Column-Families
+  // For this use case, we use a single static column family/ partition
+  //
+  private static final String COLUMN_FAMILY_NAME = "spill_map";
+
+  private static final Logger LOG = LogManager.getLogger(SpillableRocksDBBasedMap.class);
+  // Stores the key and corresponding value's latest metadata spilled to disk
+  private final Set<T> keySet;
+  private final String rocksDbStoragePath;
+  private RocksDBDAO rocksDb;
+
+  public SpillableRocksDBBasedMap(String rocksDbStoragePath) throws IOException {
+    this.keySet = new HashSet<>();
+    this.rocksDbStoragePath = rocksDbStoragePath;
+  }
+
+  @Override
+  public int size() {
+    return keySet.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return keySet.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return keySet.contains((T) key);
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+    throw new HoodieNotSupportedException("unable to compare values in map");
+  }
+
+  @Override
+  public R get(Object key) {
+    if (!containsKey(key)) {
+      return null;
+    }
+    return getRocksDb().get(COLUMN_FAMILY_NAME, (T) key);
+  }
+
+  @Override
+  public R put(T key, R value) {
+    getRocksDb().put(COLUMN_FAMILY_NAME, key, value);
+    keySet.add(key);
+    return value;
+  }
+
+  @Override
+  public R remove(Object key) {
+    R value = get(key);
+    if (value != null) {
+      keySet.remove((T) key);
+      getRocksDb().delete(COLUMN_FAMILY_NAME, (T) key);
+    }
+    return value;
+  }
+
+  @Override
+  public void putAll(Map<? extends T, ? extends R> keyValues) {
+    getRocksDb().writeBatch(batch -> keyValues.forEach((key, value) -> getRocksDb().putInBatch(batch, COLUMN_FAMILY_NAME, key, value)));
+    keySet.addAll(keyValues.keySet());
+  }
+
+  @Override
+  public void clear() {
+    close();
+  }
+
+  @Override
+  public @NotNull Set<T> keySet() {

Review comment:
       nit for my understanding: I haven't seen NotNull annotation being used in general within hudi (atleast for the codes I have written or reviewed). Have we started embracing it? 

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestSpillableRocksDBBasedMap.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.testutils.SpillableMapTestUtils;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test the rocksDb based Map {@link SpillableRocksDBBasedMap}
+ * that is used by {@link ExternalSpillableMap}.
+ */
+public class TestSpillableRocksDBBasedMap extends HoodieCommonTestHarness {
+
+  @BeforeEach
+  public void setUp() {
+    initPath();
+  }
+
+  @Test
+  public void testSimpleInsertSequential() throws IOException, URISyntaxException {
+    SpillableRocksDBBasedMap rocksDBBasedMap = new SpillableRocksDBBasedMap(basePath);
+    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    ((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+    List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, rocksDBBasedMap);
+
+    // Ensure the number of records is correct
+    assertEquals(rocksDBBasedMap.size(), recordKeys.size());
+    // make sure records have spilled to disk
+    assertTrue(rocksDBBasedMap.sizeOfFileOnDiskInBytes() > 0);
+    Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = rocksDBBasedMap.iterator();
+    List<HoodieRecord> oRecords = new ArrayList<>();
+    while (itr.hasNext()) {
+      HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
+      oRecords.add(rec);
+      assert recordKeys.contains(rec.getRecordKey());

Review comment:
       minor. why are we not testing for value equality. is there any particular reason ? 

##########
File path: hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestSpillableRocksDBBasedMap.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.testutils.SpillableMapTestUtils;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test the rocksDb based Map {@link SpillableRocksDBBasedMap}
+ * that is used by {@link ExternalSpillableMap}.
+ */
+public class TestSpillableRocksDBBasedMap extends HoodieCommonTestHarness {
+
+  @BeforeEach
+  public void setUp() {
+    initPath();
+  }
+
+  @Test
+  public void testSimpleInsertSequential() throws IOException, URISyntaxException {
+    SpillableRocksDBBasedMap rocksDBBasedMap = new SpillableRocksDBBasedMap(basePath);
+    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    ((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+    List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, rocksDBBasedMap);
+
+    // Ensure the number of records is correct
+    assertEquals(rocksDBBasedMap.size(), recordKeys.size());
+    // make sure records have spilled to disk
+    assertTrue(rocksDBBasedMap.sizeOfFileOnDiskInBytes() > 0);
+    Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = rocksDBBasedMap.iterator();
+    List<HoodieRecord> oRecords = new ArrayList<>();
+    while (itr.hasNext()) {
+      HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
+      oRecords.add(rec);
+      assert recordKeys.contains(rec.getRecordKey());
+    }
+    assertEquals(recordKeys.size(), oRecords.size());
+  }
+
+  @Test
+  public void testSimpleInsertRandomAccess() throws IOException, URISyntaxException {
+    SpillableRocksDBBasedMap rocksDBBasedMap = new SpillableRocksDBBasedMap(basePath);
+    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+    ((GenericRecord) iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+    List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, rocksDBBasedMap);
+
+    // make sure records have spilled to disk
+    assertTrue(rocksDBBasedMap.sizeOfFileOnDiskInBytes() > 0);
+
+    Random random = new Random();
+    for (int i = 0; i < recordKeys.size(); i++) {
+      String key = recordKeys.get(random.nextInt(recordKeys.size()));
+      assert rocksDBBasedMap.get(key) != null;
+    }
+  }
+
+  @Test
+  public void testSimpleInsertWithoutHoodieMetadata() throws IOException, URISyntaxException {
+    SpillableRocksDBBasedMap rocksDBBasedMap = new SpillableRocksDBBasedMap<>(basePath);
+    List<HoodieRecord> hoodieRecords = SchemaTestUtil.generateHoodieTestRecordsWithoutHoodieMetadata(0, 1000);
+    Set<String> recordKeys = new HashSet<>();
+    // insert generated records into the map
+    hoodieRecords.forEach(r -> {
+      rocksDBBasedMap.put(r.getRecordKey(), r);
+      recordKeys.add(r.getRecordKey());
+    });
+    // make sure records have spilled to disk
+    assertTrue(rocksDBBasedMap.sizeOfFileOnDiskInBytes() > 0);
+    Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = rocksDBBasedMap.iterator();
+    List<HoodieRecord> oRecords = new ArrayList<>();
+    while (itr.hasNext()) {
+      HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
+      oRecords.add(rec);
+      assert recordKeys.contains(rec.getRecordKey());
+    }
+  }
+
+  @Test
+  public void testSimpleUpsert() throws IOException, URISyntaxException {
+    Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
+
+    SpillableRocksDBBasedMap rocksDBBasedMap = new SpillableRocksDBBasedMap<>(basePath);
+    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 100);
+
+    // perform some inserts
+    List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, rocksDBBasedMap);
+
+    long fileSize = rocksDBBasedMap.sizeOfFileOnDiskInBytes();
+    // make sure records have spilled to disk
+    assertTrue(fileSize > 0);
+
+    // generate updates from inserts
+    List<IndexedRecord> updatedRecords = SchemaTestUtil.updateHoodieTestRecords(recordKeys,
+        SchemaTestUtil.generateHoodieTestRecords(0, 100), HoodieActiveTimeline.createNewInstantTime());
+    String newCommitTime =
+        ((GenericRecord) updatedRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+
+    // perform upserts
+    recordKeys = SpillableMapTestUtils.upsertRecords(updatedRecords, rocksDBBasedMap);
+
+    // On disk (rocksDb), upserts should actual replace the original records, and the size
+    // should not increase. But due to the way the bytes are accounted,
+    // sizeOfFileOnDiskInBytes only maintains the total
+    // bytes written, and will increase with upserts.
+    assertTrue(rocksDBBasedMap.sizeOfFileOnDiskInBytes() > fileSize);
+
+    // Upserted records (on disk) should have the latest commit time
+    Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = rocksDBBasedMap.iterator();
+    while (itr.hasNext()) {
+      HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
+      assert recordKeys.contains(rec.getRecordKey());
+      try {
+        IndexedRecord indexedRecord = (IndexedRecord) rec.getData().getInsertValue(schema).get();

Review comment:
       Can we also cover this scenario. 
   upsert for some fraction of inserts (50% may be). And ensure that upserted records have latest commit time and inserted records have older commit time. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/util/collection/SpillableDiskMap.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.stream.Stream;
+
+/**
+ * The spillable map that provides the map interface for storing records in disk/ db after they
+ * spill over from memory. Used by {@link ExternalSpillableMap}.
+ * @param <T>
+ * @param <R>
+ */
+public interface SpillableDiskMap<T extends Serializable, R extends Serializable> extends Map<T, R>, Iterable<R> {
+  Stream<R> valueStream();
+
+  long sizeOfFileOnDiskInBytes();

Review comment:
       java docs would be nice for these methods.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org