You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/12/15 22:26:09 UTC

[GitHub] [spark] dongjoon-hyun opened a new pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

dongjoon-hyun opened a new pull request #34913:
URL: https://github.com/apache/spark/pull/34913


   ### What changes were proposed in this pull request?
   
   This PR aims to add `RocksDB` implementation for `KVStore`.
   
   ### Why are the changes needed?
   
   So far, we have `LevelDB` implementation which is blocking Apple Silicon support.
   Instead, `RocksDB` community is actively working on Apple Silicon.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. This is used in Spark internal.
   
   ### How was this patch tested?
   
   Pass the CIs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995321855


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50719/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995316436


   Thank you, @viirya . 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995347583


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50720/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770270000



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.RocksIterator;
+
+import java.io.IOException;

Review comment:
       This import order needs fix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995270512


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146245/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996385201


   **[Test build #146309 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146309/testReport)** for PR 34913 at commit [`405b84c`](https://github.com/apache/spark/commit/405b84c66ee5bc7154b2ff2583b0b9e90dc00649).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996448366


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146309/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996457338


   @dongjoon-hyun just curious about the whole plan: what will happen if a user enables the RocksDB KVStore when there are existing LevelDB files? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun edited a comment on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun edited a comment on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996489833


   I don't think we need to support that kind of migration, @gengliangwang .
   > what will happen if a user enables the RocksDB KVStore when there are existing LevelDB files?
   
   The main goal in Apache Spark 3.3 is to support new installation on M1-based machine with additional explicit configuration (the default is LevelDB.) On M1-based machine, both LevelDB and RocksDB do not work. So, there is no previous data on that machines.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996925873


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50812/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995356509


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146246/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995270512


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146245/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770266189



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.Options;
+import org.rocksdb.Statistics;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Implementation of KVStore that uses RocksDB as the underlying data store.
+ */
+@Private
+public class RocksDB implements KVStore {
+
+  static {
+    org.rocksdb.RocksDB.loadLibrary();
+  }
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);
+
+  private static final BlockBasedTableConfig tableFormatConfig = new BlockBasedTableConfig()
+    .setFormatVersion(5);
+
+  private static final Options dbOptions = new Options()
+    .setCreateIfMissing(true)
+    .setTableFormatConfig(tableFormatConfig)
+    .setStatistics(new Statistics());
+
+  private static final WriteOptions writeOptions = new WriteOptions().setSync(true);
+
+  private AtomicReference<org.rocksdb.RocksDB> _db;

Review comment:
       Should `_db` be identified as `final`? It is assigned only once




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995307680


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50719/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996967732


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50812/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996417456


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50781/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770273263



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.RocksIterator;
+
+import java.io.IOException;

Review comment:
       Ya, right. Java `import` should be at the first. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996441673


   **[Test build #146309 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146309/testReport)** for PR 34913 at commit [`405b84c`](https://github.com/apache/spark/commit/405b84c66ee5bc7154b2ff2583b0b9e90dc00649).
    * This patch **fails PySpark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995294771


   **[Test build #146246 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146246/testReport)** for PR 34913 at commit [`dd3d009`](https://github.com/apache/spark/commit/dd3d0099ac57654ef22f834b26ad34e8480adabf).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995330733


   BTW, Scala 2.13 failure is irrelevant to this PR. The root cause is reverted from master branch 25 minutes ago.
   - https://github.com/apache/spark/commit/16841286ecb31446300c81e3f98516e92846d9fa


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-997017065


   **[Test build #146338 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146338/testReport)** for PR 34913 at commit [`abb6b6d`](https://github.com/apache/spark/commit/abb6b6dc102ffd4a214b9a016f21edfb7929f901).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770303774



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.RocksIterator;
+
+import java.io.IOException;
+import java.util.*;
+
+class RocksDBIterator<T> implements KVStoreIterator<T> {
+
+  private final RocksDB db;
+  private final boolean ascending;
+  private final RocksIterator it;
+  private final Class<T> type;
+  private final RocksDBTypeInfo ti;
+  private final RocksDBTypeInfo.Index index;
+  private final byte[] indexKeyPrefix;
+  private final byte[] end;
+  private final long max;
+
+  private boolean checkedNext;
+  private byte[] next;
+  private boolean closed;
+  private long count;
+
+  RocksDBIterator(Class<T> type, RocksDB db, KVStoreView<T> params) throws Exception {
+    this.db = db;
+    this.ascending = params.ascending;
+    this.it = db.db().newIterator();
+    this.type = type;
+    this.ti = db.getTypeInfo(type);
+    this.index = ti.index(params.index);
+    this.max = params.max;
+
+    Preconditions.checkArgument(!index.isChild() || params.parent != null,
+      "Cannot iterate over child index %s without parent value.", params.index);
+    byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null;
+
+    this.indexKeyPrefix = index.keyPrefix(parent);
+
+    byte[] firstKey;
+    if (params.first != null) {
+      if (ascending) {
+        firstKey = index.start(parent, params.first);
+      } else {
+        firstKey = index.end(parent, params.first);
+      }
+    } else if (ascending) {
+      firstKey = index.keyPrefix(parent);
+    } else {
+      firstKey = index.end(parent);
+    }
+    it.seek(firstKey);
+
+    byte[] end = null;
+    if (ascending) {
+      if (params.last != null) {
+        end = index.end(parent, params.last);
+      } else {
+        end = index.end(parent);
+      }
+    } else {
+      if (params.last != null) {
+        end = index.start(parent, params.last);
+      }
+      if (it.isValid() && compare(it.key(), indexKeyPrefix) <= 0) {
+        // continue
+      } else {
+        it.prev();
+      }
+    }
+    this.end = end;
+
+    if (params.skip > 0) {
+      skip(params.skip);
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!checkedNext && !closed) {
+      next = loadNext();
+      checkedNext = true;
+    }
+    if (!closed && next == null) {
+      try {
+        close();
+      } catch (IOException ioe) {
+        throw Throwables.propagate(ioe);
+      }
+    }
+    return next != null;
+  }
+
+  @Override
+  public T next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+    checkedNext = false;
+
+    try {
+      T ret;
+      if (index == null || index.isCopy()) {
+        ret = db.serializer.deserialize(next, type);
+      } else {
+        byte[] key = ti.buildKey(false, ti.naturalIndex().keyPrefix(null), next);
+        ret = db.get(key, type);
+      }
+      next = null;
+      return ret;
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public void remove() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<T> next(int max) {
+    List<T> list = new ArrayList<>(max);
+    while (hasNext() && list.size() < max) {
+      list.add(next());
+    }
+    return list;
+  }
+
+  @Override
+  public boolean skip(long n) {
+    long skipped = 0;
+    while (skipped < n) {
+      if (next != null) {
+        checkedNext = false;
+        next = null;
+        skipped++;
+        continue;
+      }
+
+      if (!it.isValid()) {
+        checkedNext = true;
+        return false;
+      }
+
+      if (!isEndMarker(it.key())) {
+        skipped++;
+      }
+      if (ascending) {
+        it.next();
+      } else {
+        it.prev();
+      }
+    }
+
+    return hasNext();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    db.notifyIteratorClosed(this);
+    if (!closed) {
+      it.close();
+      closed = true;
+    }
+  }
+
+  /**
+   * Because it's tricky to expose closeable iterators through many internal APIs, especially
+   * when Scala wrappers are used, this makes sure that, hopefully, the JNI resources held by
+   * the iterator will eventually be released.
+   */
+  @SuppressWarnings("deprecation")

Review comment:
       nit: we can remove this redundant suppression 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995354353


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50720/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-997018139


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146338/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770295084



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.RocksIterator;
+
+import java.io.IOException;
+import java.util.*;
+
+class RocksDBIterator<T> implements KVStoreIterator<T> {
+
+  private final RocksDB db;
+  private final boolean ascending;
+  private final RocksIterator it;
+  private final Class<T> type;
+  private final RocksDBTypeInfo ti;
+  private final RocksDBTypeInfo.Index index;
+  private final byte[] indexKeyPrefix;
+  private final byte[] end;
+  private final long max;
+
+  private boolean checkedNext;
+  private byte[] next;
+  private boolean closed;
+  private long count;
+
+  RocksDBIterator(Class<T> type, RocksDB db, KVStoreView<T> params) throws Exception {
+    this.db = db;
+    this.ascending = params.ascending;
+    this.it = db.db().newIterator();
+    this.type = type;
+    this.ti = db.getTypeInfo(type);
+    this.index = ti.index(params.index);
+    this.max = params.max;
+
+    Preconditions.checkArgument(!index.isChild() || params.parent != null,
+      "Cannot iterate over child index %s without parent value.", params.index);
+    byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null;
+
+    this.indexKeyPrefix = index.keyPrefix(parent);
+
+    byte[] firstKey;
+    if (params.first != null) {
+      if (ascending) {
+        firstKey = index.start(parent, params.first);
+      } else {
+        firstKey = index.end(parent, params.first);
+      }
+    } else if (ascending) {
+      firstKey = index.keyPrefix(parent);
+    } else {
+      firstKey = index.end(parent);
+    }
+    it.seek(firstKey);
+
+    byte[] end = null;
+    if (ascending) {
+      if (params.last != null) {
+        end = index.end(parent, params.last);
+      } else {
+        end = index.end(parent);
+      }
+    } else {
+      if (params.last != null) {
+        end = index.start(parent, params.last);
+      }
+      if (it.isValid() && compare(it.key(), indexKeyPrefix) <= 0) {
+        // continue
+      } else {
+        it.prev();

Review comment:
       For `LevelDBIterator`, there will call `it.peekNext().getKey()` to get `nextKey`, and `peekNext` method will throw `NoSuchElementException` if `iterator.isValid()` is `false`.
   
   For `RocksIterator`, if `it.isValid()` if `false` will call `it.prev()`,  will this operation throw other type exceptions? 
   
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995321855


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50719/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995277815


   cc @viirya , @HyukjinKwon , @LuciferYang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995354353


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50720/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770303061



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.Options;
+import org.rocksdb.Statistics;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Implementation of KVStore that uses RocksDB as the underlying data store.
+ */
+@Private
+public class RocksDB implements KVStore {
+
+  static {
+    org.rocksdb.RocksDB.loadLibrary();
+  }
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);
+
+  private static final BlockBasedTableConfig tableFormatConfig = new BlockBasedTableConfig()
+    .setFormatVersion(5);
+
+  private static final Options dbOptions = new Options()
+    .setCreateIfMissing(true)
+    .setTableFormatConfig(tableFormatConfig)
+    .setStatistics(new Statistics());
+
+  private static final WriteOptions writeOptions = new WriteOptions().setSync(true);
+
+  private AtomicReference<org.rocksdb.RocksDB> _db;
+
+  final KVStoreSerializer serializer;
+
+  /**
+   * Keep a mapping of class names to a shorter, unique ID managed by the store. This serves two
+   * purposes: make the keys stored on disk shorter, and spread out the keys, since class names
+   * will often have a long, redundant prefix (think "org.apache.spark.").
+   */
+  private final ConcurrentMap<String, byte[]> typeAliases;
+  private final ConcurrentMap<Class<?>, RocksDBTypeInfo> types;
+
+  /**
+   * Trying to close a JNI RocksDB handle with a closed DB causes JVM crashes. This is used to
+   * ensure that all iterators are correctly closed before RocksDB is closed. Use weak references
+   * to ensure that the iterator can be GCed, when it is only referenced here.
+   */
+  private final ConcurrentLinkedQueue<Reference<RocksDBIterator<?>>> iteratorTracker;
+
+  public RocksDB(File path) throws Exception {
+    this(path, new KVStoreSerializer());
+  }
+
+  public RocksDB(File path, KVStoreSerializer serializer) throws Exception {
+    this.serializer = serializer;
+    this.types = new ConcurrentHashMap<>();
+    this._db = new AtomicReference<>(org.rocksdb.RocksDB.open(dbOptions, path.toString()));
+
+    byte[] versionData = db().get(STORE_VERSION_KEY);
+    if (versionData != null) {
+      long version = serializer.deserializeLong(versionData);
+      if (version != STORE_VERSION) {
+        close();
+        throw new UnsupportedStoreVersionException();
+      }
+    } else {
+      db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION));
+    }
+
+    Map<String, byte[]> aliases;
+    try {
+      aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases;
+    } catch (NoSuchElementException e) {
+      aliases = new HashMap<>();
+    }
+    typeAliases = new ConcurrentHashMap<>(aliases);
+
+    iteratorTracker = new ConcurrentLinkedQueue<>();
+  }
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    try {
+      return get(METADATA_KEY, klass);
+    } catch (NoSuchElementException nsee) {
+      return null;
+    }
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    if (value != null) {
+      put(METADATA_KEY, value);
+    } else {
+      db().delete(METADATA_KEY);
+    }
+  }
+
+  <T> T get(byte[] key, Class<T> klass) throws Exception {
+    byte[] data = db().get(key);
+    if (data == null) {
+      throw new NoSuchElementException(new String(key, UTF_8));
+    }
+    return serializer.deserialize(data, klass);
+  }
+
+  private void put(byte[] key, Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    db().put(key, serializer.serialize(value));
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
+    byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey);
+    return get(key, klass);
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    RocksDBTypeInfo ti = getTypeInfo(value.getClass());
+    byte[] data = serializer.serialize(value);
+    synchronized (ti) {
+      try (WriteBatch writeBatch = new WriteBatch()) {
+        updateBatch(writeBatch, value, data, value.getClass(), ti.naturalIndex(), ti.indices());
+        db().write(writeOptions, writeBatch);
+      }
+    }
+  }
+
+  public void writeAll(List<?> values) throws Exception {
+    Preconditions.checkArgument(values != null && !values.isEmpty(),
+      "Non-empty values required.");
+
+    // Group by class, in case there are values from different classes in the values
+    // Typical usecase is for this to be a single class.
+    // A NullPointerException will be thrown if values contain null object.
+    for (Map.Entry<? extends Class<?>, ? extends List<?>> entry :
+        values.stream().collect(Collectors.groupingBy(Object::getClass)).entrySet()) {
+
+      final Iterator<?> valueIter = entry.getValue().iterator();
+      final Iterator<byte[]> serializedValueIter;
+
+      // Deserialize outside synchronized block
+      List<byte[]> list = new ArrayList<>(entry.getValue().size());
+      for (Object value : values) {
+        list.add(serializer.serialize(value));
+      }
+      serializedValueIter = list.iterator();
+
+      final Class<?> klass = entry.getKey();
+      final RocksDBTypeInfo ti = getTypeInfo(klass);
+
+      synchronized (ti) {
+        final RocksDBTypeInfo.Index naturalIndex = ti.naturalIndex();
+        final Collection<RocksDBTypeInfo.Index> indices = ti.indices();
+
+        try (WriteBatch writeBatch = new WriteBatch()) {
+          while (valueIter.hasNext()) {
+            updateBatch(writeBatch, valueIter.next(), serializedValueIter.next(), klass,
+                    naturalIndex, indices);
+          }
+          db().write(writeOptions, writeBatch);
+        }
+      }
+    }
+  }
+
+  private void updateBatch(
+      WriteBatch batch,
+      Object value,
+      byte[] data,
+      Class<?> klass,
+      RocksDBTypeInfo.Index naturalIndex,
+      Collection<RocksDBTypeInfo.Index> indices) throws Exception {
+    Object existing;
+    try {
+      existing = get(naturalIndex.entityKey(null, value), klass);
+    } catch (NoSuchElementException e) {
+      existing = null;
+    }
+
+    PrefixCache cache = new PrefixCache(value);
+    byte[] naturalKey = naturalIndex.toKey(naturalIndex.getValue(value));
+    for (RocksDBTypeInfo.Index idx : indices) {
+      byte[] prefix = cache.getPrefix(idx);
+      idx.add(batch, value, existing, data, naturalKey, prefix);
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
+    try (WriteBatch writeBatch = new WriteBatch()) {
+      RocksDBTypeInfo ti = getTypeInfo(type);
+      byte[] key = ti.naturalIndex().start(null, naturalKey);
+      synchronized (ti) {
+        byte[] data = db().get(key);
+        if (data != null) {
+          Object existing = serializer.deserialize(data, type);
+          PrefixCache cache = new PrefixCache(existing);
+          byte[] keyBytes = ti.naturalIndex().toKey(ti.naturalIndex().getValue(existing));
+          for (RocksDBTypeInfo.Index idx : ti.indices()) {
+            idx.remove(writeBatch, existing, keyBytes, cache.getPrefix(idx));
+          }
+          db().write(writeOptions, writeBatch);
+        }
+      }
+    } catch (NoSuchElementException nse) {
+      // Ignore.
+    }
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    return new KVStoreView<T>() {
+      @Override
+      public Iterator<T> iterator() {
+        try {
+          RocksDBIterator<T> it = new RocksDBIterator<>(type, RocksDB.this, this);
+          iteratorTracker.add(new WeakReference<>(it));
+          return it;
+        } catch (Exception e) {
+          throw Throwables.propagate(e);
+        }
+      }
+    };
+  }
+
+  @Override
+  public <T> boolean removeAllByIndexValues(
+      Class<T> klass,
+      String index,
+      Collection<?> indexValues) throws Exception {
+    RocksDBTypeInfo.Index naturalIndex = getTypeInfo(klass).naturalIndex();
+    boolean removed = false;
+    KVStoreView<T> view = view(klass).index(index);
+
+    for (Object indexValue : indexValues) {
+      for (T value: view.first(indexValue).last(indexValue)) {
+        Object itemKey = naturalIndex.getValue(value);
+        delete(klass, itemKey);
+        removed = true;
+      }
+    }
+
+    return removed;
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    RocksDBTypeInfo.Index idx = getTypeInfo(type).naturalIndex();
+    return idx.getCount(idx.end(null));
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws Exception {
+    RocksDBTypeInfo.Index idx = getTypeInfo(type).index(index);
+    return idx.getCount(idx.end(null, indexedValue));
+  }
+
+  @Override
+  public void close() throws IOException {
+    synchronized (this._db) {
+      org.rocksdb.RocksDB _db = this._db.getAndSet(null);
+      if (_db == null) {
+        return;
+      }
+
+      try {
+        if (iteratorTracker != null) {
+          for (Reference<RocksDBIterator<?>> ref: iteratorTracker) {
+            RocksDBIterator<?> it = ref.get();
+            if (it != null) {
+              it.close();
+            }
+          }
+        }
+        _db.close();
+      } catch (IOException ioe) {
+        throw ioe;
+      } catch (Exception e) {
+        throw new IOException(e.getMessage(), e);
+      }
+    }
+  }
+
+  /**
+   * Closes the given iterator if the DB is still open. Trying to close a JNI RocksDB handle
+   * with a closed DB can cause JVM crashes, so this ensures that situation does not happen.
+   */
+  void closeIterator(RocksDBIterator<?> it) throws IOException {
+    notifyIteratorClosed(it);
+    synchronized (this._db) {
+      org.rocksdb.RocksDB _db = this._db.get();
+      if (_db != null) {
+        it.close();
+      }
+    }
+  }
+
+  /**
+   * Remove iterator from iterator tracker. `RocksDBIterator` calls it to notify
+   * iterator is closed.
+   */
+  void notifyIteratorClosed(RocksDBIterator<?> it) {
+    iteratorTracker.removeIf(ref -> it.equals(ref.get()));
+  }
+
+  /** Returns metadata about indices for the given type. */
+  RocksDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
+    RocksDBTypeInfo ti = types.get(type);
+    if (ti == null) {
+      RocksDBTypeInfo tmp = new RocksDBTypeInfo(this, type, getTypeAlias(type));
+      ti = types.putIfAbsent(type, tmp);
+      if (ti == null) {
+        ti = tmp;
+      }
+    }
+    return ti;
+  }
+
+  /**
+   * Try to avoid use-after close since that has the tendency of crashing the JVM. This doesn't
+   * prevent methods that retrieved the instance from using it after close, but hopefully will
+   * catch most cases; otherwise, we'll need some kind of locking.
+   */
+  org.rocksdb.RocksDB db() {
+    org.rocksdb.RocksDB _db = this._db.get();
+    if (_db == null) {
+      throw new IllegalStateException("DB is closed.");
+    }
+    return _db;
+  }
+
+  private byte[] getTypeAlias(Class<?> klass) throws Exception {
+    byte[] alias = typeAliases.get(klass.getName());
+    if (alias == null) {
+      synchronized (typeAliases) {
+        byte[] tmp = String.valueOf(typeAliases.size()).getBytes(UTF_8);
+        alias = typeAliases.putIfAbsent(klass.getName(), tmp);
+        if (alias == null) {
+          alias = tmp;
+          put(TYPE_ALIASES_KEY, new TypeAliases(typeAliases));
+        }
+      }
+    }
+    return alias;
+  }
+
+  /** Needs to be public for Jackson. */
+  public static class TypeAliases {
+
+    public Map<String, byte[]> aliases;
+
+    TypeAliases(Map<String, byte[]> aliases) {
+      this.aliases = aliases;
+    }
+
+    TypeAliases() {
+      this(null);
+    }
+
+  }
+
+  private static class PrefixCache {

Review comment:
       OK ~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995284843


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50719/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995294771


   **[Test build #146246 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146246/testReport)** for PR 34913 at commit [`dd3d009`](https://github.com/apache/spark/commit/dd3d0099ac57654ef22f834b26ad34e8480adabf).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996373703


   Thank you, @LuciferYang . I addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996448366


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146309/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r771206987



##########
File path: common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBSuite.java
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.rocksdb.RocksIterator;
+
+import static org.junit.Assert.*;
+import static org.junit.Assume.assumeFalse;
+
+public class RocksDBSuite {
+
+  private RocksDB db;
+  private File dbpath;
+
+  @After
+  public void cleanup() throws Exception {
+    if (db != null) {
+      db.close();
+    }
+    if (dbpath != null) {
+      FileUtils.deleteQuietly(dbpath);
+    }
+  }
+
+  @Before
+  public void setup() throws Exception {
+    assumeFalse(SystemUtils.IS_OS_MAC_OSX && SystemUtils.OS_ARCH.equals("aarch64"));

Review comment:
       Do we need to add a JIRA number here to track it? I think we want to remove this once RocksDB JNI supports it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-997158227


   +CC @thejdeep, @shardulm94 FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995355843


   **[Test build #146246 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146246/testReport)** for PR 34913 at commit [`dd3d009`](https://github.com/apache/spark/commit/dd3d0099ac57654ef22f834b26ad34e8480adabf).
    * This patch **fails Spark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770297766



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.RocksIterator;
+
+import java.io.IOException;
+import java.util.*;
+
+class RocksDBIterator<T> implements KVStoreIterator<T> {
+
+  private final RocksDB db;
+  private final boolean ascending;
+  private final RocksIterator it;
+  private final Class<T> type;
+  private final RocksDBTypeInfo ti;
+  private final RocksDBTypeInfo.Index index;
+  private final byte[] indexKeyPrefix;
+  private final byte[] end;
+  private final long max;
+
+  private boolean checkedNext;
+  private byte[] next;
+  private boolean closed;
+  private long count;
+
+  RocksDBIterator(Class<T> type, RocksDB db, KVStoreView<T> params) throws Exception {
+    this.db = db;
+    this.ascending = params.ascending;
+    this.it = db.db().newIterator();
+    this.type = type;
+    this.ti = db.getTypeInfo(type);
+    this.index = ti.index(params.index);
+    this.max = params.max;
+
+    Preconditions.checkArgument(!index.isChild() || params.parent != null,
+      "Cannot iterate over child index %s without parent value.", params.index);
+    byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null;
+
+    this.indexKeyPrefix = index.keyPrefix(parent);
+
+    byte[] firstKey;
+    if (params.first != null) {
+      if (ascending) {
+        firstKey = index.start(parent, params.first);
+      } else {
+        firstKey = index.end(parent, params.first);
+      }
+    } else if (ascending) {
+      firstKey = index.keyPrefix(parent);
+    } else {
+      firstKey = index.end(parent);
+    }
+    it.seek(firstKey);
+
+    byte[] end = null;
+    if (ascending) {
+      if (params.last != null) {
+        end = index.end(parent, params.last);
+      } else {
+        end = index.end(parent);
+      }
+    } else {
+      if (params.last != null) {
+        end = index.start(parent, params.last);
+      }
+      if (it.isValid() && compare(it.key(), indexKeyPrefix) <= 0) {
+        // continue
+      } else {
+        it.prev();

Review comment:
       so should we change to 
   
   ```
         if(!it.isValid()) {
           throw new NoSuchElementException();
         }
         if (compare(it.key(), indexKeyPrefix) > 0) {
           it.prev();
         }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996502647


   > @dongjoon-hyun just curious about the whole plan: what will happen if a user enables the RocksDB KVStore when there are existing LevelDB files?
   
   As a user who uses spark historyserver in a relatively large range (about 150000+ SparkApp every day), I accept deleting the `LevelDB` instances and reply them again during the historyserver upgrade process
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BelodengKlaus commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
BelodengKlaus commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996474285


   
   
   
   > @BelodengKlaus
   > 
   > > the performance shows that LevelDB is faster than RocksDB. And after Apple Silicon support RocksDB may better than LevelDB ? (maybe only reason is the LevelDB became inactive?)
   > 
   > Yes, currently, both DB doesn't work at all at native Java distribution on Apple Silicon.
   > 
   > At Apache Spark 3.3, we will have both backends and the user can choose one of them for their use cases. The performance tuning will be followed too.
   
   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770259068



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.Options;
+import org.rocksdb.Statistics;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Implementation of KVStore that uses RocksDB as the underlying data store.
+ */
+@Private
+public class RocksDB implements KVStore {
+
+  static {
+    org.rocksdb.RocksDB.loadLibrary();
+  }
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);
+
+  private static final BlockBasedTableConfig tableFormatConfig = new BlockBasedTableConfig()

Review comment:
       Thank you for your explanation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996982126


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50812/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996901346


   **[Test build #146338 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146338/testReport)** for PR 34913 at commit [`abb6b6d`](https://github.com/apache/spark/commit/abb6b6dc102ffd4a214b9a016f21edfb7929f901).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996489833


   I don't think we need to support that kind of migration, @gengliangwang .
   > what will happen if a user enables the RocksDB KVStore when there are existing LevelDB files?
   
   The main goal in Apache Spark 3.3 is to support new installation on M1-based machine. On M1-based machine, both LevelDB and RocksDB do not work. So, there is no previous data on that machines.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995449693


   Very valuable changes, I will take a look
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995264115


   **[Test build #146245 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146245/testReport)** for PR 34913 at commit [`4b603fd`](https://github.com/apache/spark/commit/4b603fd34c571d65da3e060d617b4694707b7b29).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995307541


   Thanks. I will take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996373840


   Could you review this once more, @viirya and @LuciferYang ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995264115


   **[Test build #146245 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146245/testReport)** for PR 34913 at commit [`4b603fd`](https://github.com/apache/spark/commit/4b603fd34c571d65da3e060d617b4694707b7b29).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770247961



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.Options;
+import org.rocksdb.Statistics;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Implementation of KVStore that uses RocksDB as the underlying data store.
+ */
+@Private
+public class RocksDB implements KVStore {
+
+  static {
+    org.rocksdb.RocksDB.loadLibrary();
+  }
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);
+
+  private static final BlockBasedTableConfig tableFormatConfig = new BlockBasedTableConfig()

Review comment:
       Thank you for review, @LuciferYang .
   Actually, `BlockBasedTableConfig` has a set of configurations and I tested some of them and cleaned up now.
   When we tune it in the future, this style is more easier to change `BlockBasedTableConfig` only instead of touching `dbOptions`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770297766



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.RocksIterator;
+
+import java.io.IOException;
+import java.util.*;
+
+class RocksDBIterator<T> implements KVStoreIterator<T> {
+
+  private final RocksDB db;
+  private final boolean ascending;
+  private final RocksIterator it;
+  private final Class<T> type;
+  private final RocksDBTypeInfo ti;
+  private final RocksDBTypeInfo.Index index;
+  private final byte[] indexKeyPrefix;
+  private final byte[] end;
+  private final long max;
+
+  private boolean checkedNext;
+  private byte[] next;
+  private boolean closed;
+  private long count;
+
+  RocksDBIterator(Class<T> type, RocksDB db, KVStoreView<T> params) throws Exception {
+    this.db = db;
+    this.ascending = params.ascending;
+    this.it = db.db().newIterator();
+    this.type = type;
+    this.ti = db.getTypeInfo(type);
+    this.index = ti.index(params.index);
+    this.max = params.max;
+
+    Preconditions.checkArgument(!index.isChild() || params.parent != null,
+      "Cannot iterate over child index %s without parent value.", params.index);
+    byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null;
+
+    this.indexKeyPrefix = index.keyPrefix(parent);
+
+    byte[] firstKey;
+    if (params.first != null) {
+      if (ascending) {
+        firstKey = index.start(parent, params.first);
+      } else {
+        firstKey = index.end(parent, params.first);
+      }
+    } else if (ascending) {
+      firstKey = index.keyPrefix(parent);
+    } else {
+      firstKey = index.end(parent);
+    }
+    it.seek(firstKey);
+
+    byte[] end = null;
+    if (ascending) {
+      if (params.last != null) {
+        end = index.end(parent, params.last);
+      } else {
+        end = index.end(parent);
+      }
+    } else {
+      if (params.last != null) {
+        end = index.start(parent, params.last);
+      }
+      if (it.isValid() && compare(it.key(), indexKeyPrefix) <= 0) {
+        // continue
+      } else {
+        it.prev();

Review comment:
       so should we change to 
   
   ```
   if(!it.isValid()) {
           throw new NoSuchElementException();
         }
         if (compare(it.key(), indexKeyPrefix) > 0) {
           it.prev();
         }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996427890


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50781/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770295084



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.RocksIterator;
+
+import java.io.IOException;
+import java.util.*;
+
+class RocksDBIterator<T> implements KVStoreIterator<T> {
+
+  private final RocksDB db;
+  private final boolean ascending;
+  private final RocksIterator it;
+  private final Class<T> type;
+  private final RocksDBTypeInfo ti;
+  private final RocksDBTypeInfo.Index index;
+  private final byte[] indexKeyPrefix;
+  private final byte[] end;
+  private final long max;
+
+  private boolean checkedNext;
+  private byte[] next;
+  private boolean closed;
+  private long count;
+
+  RocksDBIterator(Class<T> type, RocksDB db, KVStoreView<T> params) throws Exception {
+    this.db = db;
+    this.ascending = params.ascending;
+    this.it = db.db().newIterator();
+    this.type = type;
+    this.ti = db.getTypeInfo(type);
+    this.index = ti.index(params.index);
+    this.max = params.max;
+
+    Preconditions.checkArgument(!index.isChild() || params.parent != null,
+      "Cannot iterate over child index %s without parent value.", params.index);
+    byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null;
+
+    this.indexKeyPrefix = index.keyPrefix(parent);
+
+    byte[] firstKey;
+    if (params.first != null) {
+      if (ascending) {
+        firstKey = index.start(parent, params.first);
+      } else {
+        firstKey = index.end(parent, params.first);
+      }
+    } else if (ascending) {
+      firstKey = index.keyPrefix(parent);
+    } else {
+      firstKey = index.end(parent);
+    }
+    it.seek(firstKey);
+
+    byte[] end = null;
+    if (ascending) {
+      if (params.last != null) {
+        end = index.end(parent, params.last);
+      } else {
+        end = index.end(parent);
+      }
+    } else {
+      if (params.last != null) {
+        end = index.start(parent, params.last);
+      }
+      if (it.isValid() && compare(it.key(), indexKeyPrefix) <= 0) {
+        // continue
+      } else {
+        it.prev();

Review comment:
       For `LevelDBIterator`, there will call `it.peekNext().getKey()` to get `nextKey`, and `peekNext` method will throw `NoSuchElementException` if `iterator.isValid()` is `false`.
   
   For `RocksIterator`, if `it.isValid()` if `false` will call `it.prev()`,  will this operation throw other exceptions? 
   
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770272892



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.Options;
+import org.rocksdb.Statistics;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Implementation of KVStore that uses RocksDB as the underlying data store.
+ */
+@Private
+public class RocksDB implements KVStore {
+
+  static {
+    org.rocksdb.RocksDB.loadLibrary();
+  }
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);
+
+  private static final BlockBasedTableConfig tableFormatConfig = new BlockBasedTableConfig()
+    .setFormatVersion(5);
+
+  private static final Options dbOptions = new Options()
+    .setCreateIfMissing(true)
+    .setTableFormatConfig(tableFormatConfig)
+    .setStatistics(new Statistics());
+
+  private static final WriteOptions writeOptions = new WriteOptions().setSync(true);
+
+  private AtomicReference<org.rocksdb.RocksDB> _db;
+
+  final KVStoreSerializer serializer;
+
+  /**
+   * Keep a mapping of class names to a shorter, unique ID managed by the store. This serves two
+   * purposes: make the keys stored on disk shorter, and spread out the keys, since class names
+   * will often have a long, redundant prefix (think "org.apache.spark.").
+   */
+  private final ConcurrentMap<String, byte[]> typeAliases;
+  private final ConcurrentMap<Class<?>, RocksDBTypeInfo> types;
+
+  /**
+   * Trying to close a JNI RocksDB handle with a closed DB causes JVM crashes. This is used to
+   * ensure that all iterators are correctly closed before RocksDB is closed. Use weak references
+   * to ensure that the iterator can be GCed, when it is only referenced here.
+   */
+  private final ConcurrentLinkedQueue<Reference<RocksDBIterator<?>>> iteratorTracker;
+
+  public RocksDB(File path) throws Exception {
+    this(path, new KVStoreSerializer());
+  }
+
+  public RocksDB(File path, KVStoreSerializer serializer) throws Exception {
+    this.serializer = serializer;
+    this.types = new ConcurrentHashMap<>();
+    this._db = new AtomicReference<>(org.rocksdb.RocksDB.open(dbOptions, path.toString()));
+
+    byte[] versionData = db().get(STORE_VERSION_KEY);
+    if (versionData != null) {
+      long version = serializer.deserializeLong(versionData);
+      if (version != STORE_VERSION) {
+        close();
+        throw new UnsupportedStoreVersionException();
+      }
+    } else {
+      db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION));
+    }
+
+    Map<String, byte[]> aliases;
+    try {
+      aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases;
+    } catch (NoSuchElementException e) {
+      aliases = new HashMap<>();
+    }
+    typeAliases = new ConcurrentHashMap<>(aliases);
+
+    iteratorTracker = new ConcurrentLinkedQueue<>();
+  }
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    try {
+      return get(METADATA_KEY, klass);
+    } catch (NoSuchElementException nsee) {
+      return null;
+    }
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    if (value != null) {
+      put(METADATA_KEY, value);
+    } else {
+      db().delete(METADATA_KEY);
+    }
+  }
+
+  <T> T get(byte[] key, Class<T> klass) throws Exception {
+    byte[] data = db().get(key);
+    if (data == null) {
+      throw new NoSuchElementException(new String(key, UTF_8));
+    }
+    return serializer.deserialize(data, klass);
+  }
+
+  private void put(byte[] key, Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    db().put(key, serializer.serialize(value));
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
+    byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey);
+    return get(key, klass);
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    RocksDBTypeInfo ti = getTypeInfo(value.getClass());
+    byte[] data = serializer.serialize(value);
+    synchronized (ti) {
+      try (WriteBatch writeBatch = new WriteBatch()) {
+        updateBatch(writeBatch, value, data, value.getClass(), ti.naturalIndex(), ti.indices());
+        db().write(writeOptions, writeBatch);
+      }
+    }
+  }
+
+  public void writeAll(List<?> values) throws Exception {
+    Preconditions.checkArgument(values != null && !values.isEmpty(),
+      "Non-empty values required.");
+
+    // Group by class, in case there are values from different classes in the values
+    // Typical usecase is for this to be a single class.
+    // A NullPointerException will be thrown if values contain null object.
+    for (Map.Entry<? extends Class<?>, ? extends List<?>> entry :
+        values.stream().collect(Collectors.groupingBy(Object::getClass)).entrySet()) {
+
+      final Iterator<?> valueIter = entry.getValue().iterator();
+      final Iterator<byte[]> serializedValueIter;
+
+      // Deserialize outside synchronized block
+      List<byte[]> list = new ArrayList<>(entry.getValue().size());
+      for (Object value : values) {
+        list.add(serializer.serialize(value));
+      }
+      serializedValueIter = list.iterator();
+
+      final Class<?> klass = entry.getKey();
+      final RocksDBTypeInfo ti = getTypeInfo(klass);
+
+      synchronized (ti) {
+        final RocksDBTypeInfo.Index naturalIndex = ti.naturalIndex();
+        final Collection<RocksDBTypeInfo.Index> indices = ti.indices();
+
+        try (WriteBatch writeBatch = new WriteBatch()) {
+          while (valueIter.hasNext()) {
+            updateBatch(writeBatch, valueIter.next(), serializedValueIter.next(), klass,
+                    naturalIndex, indices);
+          }
+          db().write(writeOptions, writeBatch);
+        }
+      }
+    }
+  }
+
+  private void updateBatch(
+      WriteBatch batch,
+      Object value,
+      byte[] data,
+      Class<?> klass,
+      RocksDBTypeInfo.Index naturalIndex,
+      Collection<RocksDBTypeInfo.Index> indices) throws Exception {
+    Object existing;
+    try {
+      existing = get(naturalIndex.entityKey(null, value), klass);
+    } catch (NoSuchElementException e) {
+      existing = null;
+    }
+
+    PrefixCache cache = new PrefixCache(value);
+    byte[] naturalKey = naturalIndex.toKey(naturalIndex.getValue(value));
+    for (RocksDBTypeInfo.Index idx : indices) {
+      byte[] prefix = cache.getPrefix(idx);
+      idx.add(batch, value, existing, data, naturalKey, prefix);
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
+    try (WriteBatch writeBatch = new WriteBatch()) {
+      RocksDBTypeInfo ti = getTypeInfo(type);
+      byte[] key = ti.naturalIndex().start(null, naturalKey);
+      synchronized (ti) {
+        byte[] data = db().get(key);
+        if (data != null) {
+          Object existing = serializer.deserialize(data, type);
+          PrefixCache cache = new PrefixCache(existing);
+          byte[] keyBytes = ti.naturalIndex().toKey(ti.naturalIndex().getValue(existing));
+          for (RocksDBTypeInfo.Index idx : ti.indices()) {
+            idx.remove(writeBatch, existing, keyBytes, cache.getPrefix(idx));
+          }
+          db().write(writeOptions, writeBatch);
+        }
+      }
+    } catch (NoSuchElementException nse) {
+      // Ignore.
+    }
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    return new KVStoreView<T>() {
+      @Override
+      public Iterator<T> iterator() {
+        try {
+          RocksDBIterator<T> it = new RocksDBIterator<>(type, RocksDB.this, this);
+          iteratorTracker.add(new WeakReference<>(it));
+          return it;
+        } catch (Exception e) {
+          throw Throwables.propagate(e);
+        }
+      }
+    };
+  }
+
+  @Override
+  public <T> boolean removeAllByIndexValues(
+      Class<T> klass,
+      String index,
+      Collection<?> indexValues) throws Exception {
+    RocksDBTypeInfo.Index naturalIndex = getTypeInfo(klass).naturalIndex();
+    boolean removed = false;
+    KVStoreView<T> view = view(klass).index(index);
+
+    for (Object indexValue : indexValues) {
+      for (T value: view.first(indexValue).last(indexValue)) {
+        Object itemKey = naturalIndex.getValue(value);
+        delete(klass, itemKey);
+        removed = true;
+      }
+    }
+
+    return removed;
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    RocksDBTypeInfo.Index idx = getTypeInfo(type).naturalIndex();
+    return idx.getCount(idx.end(null));
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws Exception {
+    RocksDBTypeInfo.Index idx = getTypeInfo(type).index(index);
+    return idx.getCount(idx.end(null, indexedValue));
+  }
+
+  @Override
+  public void close() throws IOException {
+    synchronized (this._db) {
+      org.rocksdb.RocksDB _db = this._db.getAndSet(null);
+      if (_db == null) {
+        return;
+      }
+
+      try {
+        if (iteratorTracker != null) {
+          for (Reference<RocksDBIterator<?>> ref: iteratorTracker) {
+            RocksDBIterator<?> it = ref.get();
+            if (it != null) {
+              it.close();
+            }
+          }
+        }
+        _db.close();
+      } catch (IOException ioe) {
+        throw ioe;
+      } catch (Exception e) {
+        throw new IOException(e.getMessage(), e);
+      }
+    }
+  }
+
+  /**
+   * Closes the given iterator if the DB is still open. Trying to close a JNI RocksDB handle
+   * with a closed DB can cause JVM crashes, so this ensures that situation does not happen.
+   */
+  void closeIterator(RocksDBIterator<?> it) throws IOException {
+    notifyIteratorClosed(it);
+    synchronized (this._db) {
+      org.rocksdb.RocksDB _db = this._db.get();
+      if (_db != null) {
+        it.close();
+      }
+    }
+  }
+
+  /**
+   * Remove iterator from iterator tracker. `RocksDBIterator` calls it to notify
+   * iterator is closed.
+   */
+  void notifyIteratorClosed(RocksDBIterator<?> it) {
+    iteratorTracker.removeIf(ref -> it.equals(ref.get()));
+  }
+
+  /** Returns metadata about indices for the given type. */
+  RocksDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
+    RocksDBTypeInfo ti = types.get(type);
+    if (ti == null) {
+      RocksDBTypeInfo tmp = new RocksDBTypeInfo(this, type, getTypeAlias(type));
+      ti = types.putIfAbsent(type, tmp);
+      if (ti == null) {
+        ti = tmp;
+      }
+    }
+    return ti;
+  }
+
+  /**
+   * Try to avoid use-after close since that has the tendency of crashing the JVM. This doesn't
+   * prevent methods that retrieved the instance from using it after close, but hopefully will
+   * catch most cases; otherwise, we'll need some kind of locking.
+   */
+  org.rocksdb.RocksDB db() {
+    org.rocksdb.RocksDB _db = this._db.get();
+    if (_db == null) {
+      throw new IllegalStateException("DB is closed.");
+    }
+    return _db;
+  }
+
+  private byte[] getTypeAlias(Class<?> klass) throws Exception {
+    byte[] alias = typeAliases.get(klass.getName());
+    if (alias == null) {
+      synchronized (typeAliases) {
+        byte[] tmp = String.valueOf(typeAliases.size()).getBytes(UTF_8);
+        alias = typeAliases.putIfAbsent(klass.getName(), tmp);
+        if (alias == null) {
+          alias = tmp;
+          put(TYPE_ALIASES_KEY, new TypeAliases(typeAliases));
+        }
+      }
+    }
+    return alias;
+  }
+
+  /** Needs to be public for Jackson. */
+  public static class TypeAliases {
+
+    public Map<String, byte[]> aliases;
+
+    TypeAliases(Map<String, byte[]> aliases) {
+      this.aliases = aliases;
+    }
+
+    TypeAliases() {
+      this(null);
+    }
+
+  }
+
+  private static class PrefixCache {

Review comment:
       Yes, exactly, @LuciferYang . You can do `diff LevelDB.java RocksDB.java` to see the difference.
   We are going to remove LevelDB in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995270466


   **[Test build #146245 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146245/testReport)** for PR 34913 at commit [`4b603fd`](https://github.com/apache/spark/commit/4b603fd34c571d65da3e060d617b4694707b7b29).
    * This patch **fails Java style tests**.
    * This patch merges cleanly.
    * This patch adds the following public classes _(experimental)_:
     * `public class RocksDB implements KVStore `
     * `  public static class TypeAliases `
     * `class RocksDBIterator<T> implements KVStoreIterator<T> `
     * `class RocksDBTypeInfo `
     * `  class Index `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995314163


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50720/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770332560



##########
File path: common/kvstore/src/test/java/org/apache/spark/util/kvstore/RocksDBBenchmark.java
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Slf4jReporter;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.*;
+
+/**
+ * A set of small benchmarks for the RocksDB implementation.
+ *
+ * The benchmarks are run over two different types (one with just a natural index, and one
+ * with a ref index), over a set of 2^20 elements, and the following tests are performed:
+ *
+ * - write (then update) elements in sequential natural key order
+ * - write (then update) elements in random natural key order
+ * - iterate over natural index, ascending and descending
+ * - iterate over ref index, ascending and descending
+ */
+@Ignore
+public class RocksDBBenchmark {
+
+  private static final int COUNT = 1024;
+  private static final AtomicInteger IDGEN = new AtomicInteger();
+  private static final MetricRegistry metrics = new MetricRegistry();
+  private static final Timer dbCreation = metrics.timer("dbCreation");
+  private static final Timer dbClose = metrics.timer("dbClose");
+
+  private RocksDB db;
+  private File dbpath;
+
+  @Before
+  public void setup() throws Exception {
+    dbpath = File.createTempFile("test.", ".rdb");
+    dbpath.delete();
+    try(Timer.Context ctx = dbCreation.time()) {
+      db = new RocksDB(dbpath);
+    }
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    if (db != null) {
+      try(Timer.Context ctx = dbClose.time()) {
+        db.close();
+      }
+    }
+    if (dbpath != null) {
+      FileUtils.deleteQuietly(dbpath);
+    }
+  }
+
+  @AfterClass
+  public static void report() {
+    if (metrics.getTimers().isEmpty()) {
+      return;
+    }
+
+    int headingPrefix = 0;
+    for (Map.Entry<String, Timer> e : metrics.getTimers().entrySet()) {
+      headingPrefix = Math.max(e.getKey().length(), headingPrefix);
+    }
+    headingPrefix += 4;
+
+    StringBuilder heading = new StringBuilder();
+    for (int i = 0; i < headingPrefix; i++) {
+      heading.append(" ");
+    }
+    heading.append("\tcount");
+    heading.append("\tmean");
+    heading.append("\tmin");
+    heading.append("\tmax");
+    heading.append("\t95th");
+    System.out.println(heading);
+
+    for (Map.Entry<String, Timer> e : metrics.getTimers().entrySet()) {
+      StringBuilder row = new StringBuilder();
+      row.append(e.getKey());
+      for (int i = 0; i < headingPrefix - e.getKey().length(); i++) {
+        row.append(" ");
+      }
+
+      Snapshot s = e.getValue().getSnapshot();
+      row.append("\t").append(e.getValue().getCount());
+      row.append("\t").append(toMs(s.getMean()));
+      row.append("\t").append(toMs(s.getMin()));
+      row.append("\t").append(toMs(s.getMax()));
+      row.append("\t").append(toMs(s.get95thPercentile()));
+
+      System.out.println(row);
+    }
+
+    Slf4jReporter.forRegistry(metrics).outputTo(LoggerFactory.getLogger(LevelDBBenchmark.class))

Review comment:
       LevelDBBenchmark.class -> RocksDBBenchmark.class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-995356509


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146246/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BelodengKlaus commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
BelodengKlaus commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r771180938



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.Options;
+import org.rocksdb.Statistics;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Implementation of KVStore that uses RocksDB as the underlying data store.
+ */
+@Private
+public class RocksDB implements KVStore {
+
+  static {
+    org.rocksdb.RocksDB.loadLibrary();
+  }
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);
+
+  private static final BlockBasedTableConfig tableFormatConfig = new BlockBasedTableConfig()
+    .setFormatVersion(5);
+
+  private static final Options dbOptions = new Options()
+    .setCreateIfMissing(true)
+    .setTableFormatConfig(tableFormatConfig)
+    .setStatistics(new Statistics());
+
+  private static final WriteOptions writeOptions = new WriteOptions().setSync(true);
+
+  private final AtomicReference<org.rocksdb.RocksDB> _db;
+
+  final KVStoreSerializer serializer;
+
+  /**
+   * Keep a mapping of class names to a shorter, unique ID managed by the store. This serves two
+   * purposes: make the keys stored on disk shorter, and spread out the keys, since class names
+   * will often have a long, redundant prefix (think "org.apache.spark.").
+   */
+  private final ConcurrentMap<String, byte[]> typeAliases;
+  private final ConcurrentMap<Class<?>, RocksDBTypeInfo> types;
+
+  /**
+   * Trying to close a JNI RocksDB handle with a closed DB causes JVM crashes. This is used to
+   * ensure that all iterators are correctly closed before RocksDB is closed. Use weak references
+   * to ensure that the iterator can be GCed, when it is only referenced here.
+   */
+  private final ConcurrentLinkedQueue<Reference<RocksDBIterator<?>>> iteratorTracker;
+
+  public RocksDB(File path) throws Exception {
+    this(path, new KVStoreSerializer());
+  }
+
+  public RocksDB(File path, KVStoreSerializer serializer) throws Exception {
+    this.serializer = serializer;
+    this.types = new ConcurrentHashMap<>();
+    this._db = new AtomicReference<>(org.rocksdb.RocksDB.open(dbOptions, path.toString()));
+
+    byte[] versionData = db().get(STORE_VERSION_KEY);
+    if (versionData != null) {
+      long version = serializer.deserializeLong(versionData);
+      if (version != STORE_VERSION) {
+        close();
+        throw new UnsupportedStoreVersionException();
+      }
+    } else {
+      db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION));
+    }
+
+    Map<String, byte[]> aliases;
+    try {
+      aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases;
+    } catch (NoSuchElementException e) {
+      aliases = new HashMap<>();
+    }
+    typeAliases = new ConcurrentHashMap<>(aliases);
+
+    iteratorTracker = new ConcurrentLinkedQueue<>();
+  }
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    try {
+      return get(METADATA_KEY, klass);
+    } catch (NoSuchElementException nsee) {
+      return null;
+    }
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    if (value != null) {
+      put(METADATA_KEY, value);
+    } else {
+      db().delete(METADATA_KEY);
+    }
+  }
+
+  <T> T get(byte[] key, Class<T> klass) throws Exception {
+    byte[] data = db().get(key);
+    if (data == null) {
+      throw new NoSuchElementException(new String(key, UTF_8));
+    }
+    return serializer.deserialize(data, klass);
+  }
+
+  private void put(byte[] key, Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    db().put(key, serializer.serialize(value));
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
+    byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey);
+    return get(key, klass);
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    RocksDBTypeInfo ti = getTypeInfo(value.getClass());
+    byte[] data = serializer.serialize(value);
+    synchronized (ti) {
+      try (WriteBatch writeBatch = new WriteBatch()) {
+        updateBatch(writeBatch, value, data, value.getClass(), ti.naturalIndex(), ti.indices());
+        db().write(writeOptions, writeBatch);
+      }
+    }
+  }
+
+  public void writeAll(List<?> values) throws Exception {
+    Preconditions.checkArgument(values != null && !values.isEmpty(),
+      "Non-empty values required.");
+
+    // Group by class, in case there are values from different classes in the values
+    // Typical usecase is for this to be a single class.
+    // A NullPointerException will be thrown if values contain null object.
+    for (Map.Entry<? extends Class<?>, ? extends List<?>> entry :
+        values.stream().collect(Collectors.groupingBy(Object::getClass)).entrySet()) {
+
+      final Iterator<?> valueIter = entry.getValue().iterator();
+      final Iterator<byte[]> serializedValueIter;
+
+      // Deserialize outside synchronized block
+      List<byte[]> list = new ArrayList<>(entry.getValue().size());
+      for (Object value : values) {
+        list.add(serializer.serialize(value));
+      }
+      serializedValueIter = list.iterator();
+
+      final Class<?> klass = entry.getKey();
+      final RocksDBTypeInfo ti = getTypeInfo(klass);
+
+      synchronized (ti) {
+        final RocksDBTypeInfo.Index naturalIndex = ti.naturalIndex();
+        final Collection<RocksDBTypeInfo.Index> indices = ti.indices();
+
+        try (WriteBatch writeBatch = new WriteBatch()) {
+          while (valueIter.hasNext()) {
+            updateBatch(writeBatch, valueIter.next(), serializedValueIter.next(), klass,
+                    naturalIndex, indices);

Review comment:
       indent




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996901346


   **[Test build #146338 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146338/testReport)** for PR 34913 at commit [`abb6b6d`](https://github.com/apache/spark/commit/abb6b6dc102ffd4a214b9a016f21edfb7929f901).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BelodengKlaus commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
BelodengKlaus commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996417562


   the performance shows that LevelDB is faster than RocksDB. And after Apple Silicon support RocksDB may better than LevelDB ?
   (maybe only reason is the LevelDB became inactive?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996403768


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50781/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770268700



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.Options;
+import org.rocksdb.Statistics;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Implementation of KVStore that uses RocksDB as the underlying data store.
+ */
+@Private
+public class RocksDB implements KVStore {
+
+  static {
+    org.rocksdb.RocksDB.loadLibrary();
+  }
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);
+
+  private static final BlockBasedTableConfig tableFormatConfig = new BlockBasedTableConfig()
+    .setFormatVersion(5);
+
+  private static final Options dbOptions = new Options()
+    .setCreateIfMissing(true)
+    .setTableFormatConfig(tableFormatConfig)
+    .setStatistics(new Statistics());
+
+  private static final WriteOptions writeOptions = new WriteOptions().setSync(true);
+
+  private AtomicReference<org.rocksdb.RocksDB> _db;
+
+  final KVStoreSerializer serializer;
+
+  /**
+   * Keep a mapping of class names to a shorter, unique ID managed by the store. This serves two
+   * purposes: make the keys stored on disk shorter, and spread out the keys, since class names
+   * will often have a long, redundant prefix (think "org.apache.spark.").
+   */
+  private final ConcurrentMap<String, byte[]> typeAliases;
+  private final ConcurrentMap<Class<?>, RocksDBTypeInfo> types;
+
+  /**
+   * Trying to close a JNI RocksDB handle with a closed DB causes JVM crashes. This is used to
+   * ensure that all iterators are correctly closed before RocksDB is closed. Use weak references
+   * to ensure that the iterator can be GCed, when it is only referenced here.
+   */
+  private final ConcurrentLinkedQueue<Reference<RocksDBIterator<?>>> iteratorTracker;
+
+  public RocksDB(File path) throws Exception {
+    this(path, new KVStoreSerializer());
+  }
+
+  public RocksDB(File path, KVStoreSerializer serializer) throws Exception {
+    this.serializer = serializer;
+    this.types = new ConcurrentHashMap<>();
+    this._db = new AtomicReference<>(org.rocksdb.RocksDB.open(dbOptions, path.toString()));
+
+    byte[] versionData = db().get(STORE_VERSION_KEY);
+    if (versionData != null) {
+      long version = serializer.deserializeLong(versionData);
+      if (version != STORE_VERSION) {
+        close();
+        throw new UnsupportedStoreVersionException();
+      }
+    } else {
+      db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION));
+    }
+
+    Map<String, byte[]> aliases;
+    try {
+      aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases;
+    } catch (NoSuchElementException e) {
+      aliases = new HashMap<>();
+    }
+    typeAliases = new ConcurrentHashMap<>(aliases);
+
+    iteratorTracker = new ConcurrentLinkedQueue<>();
+  }
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    try {
+      return get(METADATA_KEY, klass);
+    } catch (NoSuchElementException nsee) {
+      return null;
+    }
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    if (value != null) {
+      put(METADATA_KEY, value);
+    } else {
+      db().delete(METADATA_KEY);
+    }
+  }
+
+  <T> T get(byte[] key, Class<T> klass) throws Exception {
+    byte[] data = db().get(key);
+    if (data == null) {
+      throw new NoSuchElementException(new String(key, UTF_8));
+    }
+    return serializer.deserialize(data, klass);
+  }
+
+  private void put(byte[] key, Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    db().put(key, serializer.serialize(value));
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
+    byte[] key = getTypeInfo(klass).naturalIndex().start(null, naturalKey);
+    return get(key, klass);
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Preconditions.checkArgument(value != null, "Null values are not allowed.");
+    RocksDBTypeInfo ti = getTypeInfo(value.getClass());
+    byte[] data = serializer.serialize(value);
+    synchronized (ti) {
+      try (WriteBatch writeBatch = new WriteBatch()) {
+        updateBatch(writeBatch, value, data, value.getClass(), ti.naturalIndex(), ti.indices());
+        db().write(writeOptions, writeBatch);
+      }
+    }
+  }
+
+  public void writeAll(List<?> values) throws Exception {
+    Preconditions.checkArgument(values != null && !values.isEmpty(),
+      "Non-empty values required.");
+
+    // Group by class, in case there are values from different classes in the values
+    // Typical usecase is for this to be a single class.
+    // A NullPointerException will be thrown if values contain null object.
+    for (Map.Entry<? extends Class<?>, ? extends List<?>> entry :
+        values.stream().collect(Collectors.groupingBy(Object::getClass)).entrySet()) {
+
+      final Iterator<?> valueIter = entry.getValue().iterator();
+      final Iterator<byte[]> serializedValueIter;
+
+      // Deserialize outside synchronized block
+      List<byte[]> list = new ArrayList<>(entry.getValue().size());
+      for (Object value : values) {
+        list.add(serializer.serialize(value));
+      }
+      serializedValueIter = list.iterator();
+
+      final Class<?> klass = entry.getKey();
+      final RocksDBTypeInfo ti = getTypeInfo(klass);
+
+      synchronized (ti) {
+        final RocksDBTypeInfo.Index naturalIndex = ti.naturalIndex();
+        final Collection<RocksDBTypeInfo.Index> indices = ti.indices();
+
+        try (WriteBatch writeBatch = new WriteBatch()) {
+          while (valueIter.hasNext()) {
+            updateBatch(writeBatch, valueIter.next(), serializedValueIter.next(), klass,
+                    naturalIndex, indices);
+          }
+          db().write(writeOptions, writeBatch);
+        }
+      }
+    }
+  }
+
+  private void updateBatch(
+      WriteBatch batch,
+      Object value,
+      byte[] data,
+      Class<?> klass,
+      RocksDBTypeInfo.Index naturalIndex,
+      Collection<RocksDBTypeInfo.Index> indices) throws Exception {
+    Object existing;
+    try {
+      existing = get(naturalIndex.entityKey(null, value), klass);
+    } catch (NoSuchElementException e) {
+      existing = null;
+    }
+
+    PrefixCache cache = new PrefixCache(value);
+    byte[] naturalKey = naturalIndex.toKey(naturalIndex.getValue(value));
+    for (RocksDBTypeInfo.Index idx : indices) {
+      byte[] prefix = cache.getPrefix(idx);
+      idx.add(batch, value, existing, data, naturalKey, prefix);
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    Preconditions.checkArgument(naturalKey != null, "Null keys are not allowed.");
+    try (WriteBatch writeBatch = new WriteBatch()) {
+      RocksDBTypeInfo ti = getTypeInfo(type);
+      byte[] key = ti.naturalIndex().start(null, naturalKey);
+      synchronized (ti) {
+        byte[] data = db().get(key);
+        if (data != null) {
+          Object existing = serializer.deserialize(data, type);
+          PrefixCache cache = new PrefixCache(existing);
+          byte[] keyBytes = ti.naturalIndex().toKey(ti.naturalIndex().getValue(existing));
+          for (RocksDBTypeInfo.Index idx : ti.indices()) {
+            idx.remove(writeBatch, existing, keyBytes, cache.getPrefix(idx));
+          }
+          db().write(writeOptions, writeBatch);
+        }
+      }
+    } catch (NoSuchElementException nse) {
+      // Ignore.
+    }
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    return new KVStoreView<T>() {
+      @Override
+      public Iterator<T> iterator() {
+        try {
+          RocksDBIterator<T> it = new RocksDBIterator<>(type, RocksDB.this, this);
+          iteratorTracker.add(new WeakReference<>(it));
+          return it;
+        } catch (Exception e) {
+          throw Throwables.propagate(e);
+        }
+      }
+    };
+  }
+
+  @Override
+  public <T> boolean removeAllByIndexValues(
+      Class<T> klass,
+      String index,
+      Collection<?> indexValues) throws Exception {
+    RocksDBTypeInfo.Index naturalIndex = getTypeInfo(klass).naturalIndex();
+    boolean removed = false;
+    KVStoreView<T> view = view(klass).index(index);
+
+    for (Object indexValue : indexValues) {
+      for (T value: view.first(indexValue).last(indexValue)) {
+        Object itemKey = naturalIndex.getValue(value);
+        delete(klass, itemKey);
+        removed = true;
+      }
+    }
+
+    return removed;
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    RocksDBTypeInfo.Index idx = getTypeInfo(type).naturalIndex();
+    return idx.getCount(idx.end(null));
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws Exception {
+    RocksDBTypeInfo.Index idx = getTypeInfo(type).index(index);
+    return idx.getCount(idx.end(null, indexedValue));
+  }
+
+  @Override
+  public void close() throws IOException {
+    synchronized (this._db) {
+      org.rocksdb.RocksDB _db = this._db.getAndSet(null);
+      if (_db == null) {
+        return;
+      }
+
+      try {
+        if (iteratorTracker != null) {
+          for (Reference<RocksDBIterator<?>> ref: iteratorTracker) {
+            RocksDBIterator<?> it = ref.get();
+            if (it != null) {
+              it.close();
+            }
+          }
+        }
+        _db.close();
+      } catch (IOException ioe) {
+        throw ioe;
+      } catch (Exception e) {
+        throw new IOException(e.getMessage(), e);
+      }
+    }
+  }
+
+  /**
+   * Closes the given iterator if the DB is still open. Trying to close a JNI RocksDB handle
+   * with a closed DB can cause JVM crashes, so this ensures that situation does not happen.
+   */
+  void closeIterator(RocksDBIterator<?> it) throws IOException {
+    notifyIteratorClosed(it);
+    synchronized (this._db) {
+      org.rocksdb.RocksDB _db = this._db.get();
+      if (_db != null) {
+        it.close();
+      }
+    }
+  }
+
+  /**
+   * Remove iterator from iterator tracker. `RocksDBIterator` calls it to notify
+   * iterator is closed.
+   */
+  void notifyIteratorClosed(RocksDBIterator<?> it) {
+    iteratorTracker.removeIf(ref -> it.equals(ref.get()));
+  }
+
+  /** Returns metadata about indices for the given type. */
+  RocksDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
+    RocksDBTypeInfo ti = types.get(type);
+    if (ti == null) {
+      RocksDBTypeInfo tmp = new RocksDBTypeInfo(this, type, getTypeAlias(type));
+      ti = types.putIfAbsent(type, tmp);
+      if (ti == null) {
+        ti = tmp;
+      }
+    }
+    return ti;
+  }
+
+  /**
+   * Try to avoid use-after close since that has the tendency of crashing the JVM. This doesn't
+   * prevent methods that retrieved the instance from using it after close, but hopefully will
+   * catch most cases; otherwise, we'll need some kind of locking.
+   */
+  org.rocksdb.RocksDB db() {
+    org.rocksdb.RocksDB _db = this._db.get();
+    if (_db == null) {
+      throw new IllegalStateException("DB is closed.");
+    }
+    return _db;
+  }
+
+  private byte[] getTypeAlias(Class<?> klass) throws Exception {
+    byte[] alias = typeAliases.get(klass.getName());
+    if (alias == null) {
+      synchronized (typeAliases) {
+        byte[] tmp = String.valueOf(typeAliases.size()).getBytes(UTF_8);
+        alias = typeAliases.putIfAbsent(klass.getName(), tmp);
+        if (alias == null) {
+          alias = tmp;
+          put(TYPE_ALIASES_KEY, new TypeAliases(typeAliases));
+        }
+      }
+    }
+    return alias;
+  }
+
+  /** Needs to be public for Jackson. */
+  public static class TypeAliases {
+
+    public Map<String, byte[]> aliases;
+
+    TypeAliases(Map<String, byte[]> aliases) {
+      this.aliases = aliases;
+    }
+
+    TypeAliases() {
+      this(null);
+    }
+
+  }
+
+  private static class PrefixCache {

Review comment:
       `PrefixCache` is repeatedly defined here because this pr does not want to introduce too many refactoring work, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770295084



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBIterator.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.RocksIterator;
+
+import java.io.IOException;
+import java.util.*;
+
+class RocksDBIterator<T> implements KVStoreIterator<T> {
+
+  private final RocksDB db;
+  private final boolean ascending;
+  private final RocksIterator it;
+  private final Class<T> type;
+  private final RocksDBTypeInfo ti;
+  private final RocksDBTypeInfo.Index index;
+  private final byte[] indexKeyPrefix;
+  private final byte[] end;
+  private final long max;
+
+  private boolean checkedNext;
+  private byte[] next;
+  private boolean closed;
+  private long count;
+
+  RocksDBIterator(Class<T> type, RocksDB db, KVStoreView<T> params) throws Exception {
+    this.db = db;
+    this.ascending = params.ascending;
+    this.it = db.db().newIterator();
+    this.type = type;
+    this.ti = db.getTypeInfo(type);
+    this.index = ti.index(params.index);
+    this.max = params.max;
+
+    Preconditions.checkArgument(!index.isChild() || params.parent != null,
+      "Cannot iterate over child index %s without parent value.", params.index);
+    byte[] parent = index.isChild() ? index.parent().childPrefix(params.parent) : null;
+
+    this.indexKeyPrefix = index.keyPrefix(parent);
+
+    byte[] firstKey;
+    if (params.first != null) {
+      if (ascending) {
+        firstKey = index.start(parent, params.first);
+      } else {
+        firstKey = index.end(parent, params.first);
+      }
+    } else if (ascending) {
+      firstKey = index.keyPrefix(parent);
+    } else {
+      firstKey = index.end(parent);
+    }
+    it.seek(firstKey);
+
+    byte[] end = null;
+    if (ascending) {
+      if (params.last != null) {
+        end = index.end(parent, params.last);
+      } else {
+        end = index.end(parent);
+      }
+    } else {
+      if (params.last != null) {
+        end = index.start(parent, params.last);
+      }
+      if (it.isValid() && compare(it.key(), indexKeyPrefix) <= 0) {
+        // continue
+      } else {
+        it.prev();

Review comment:
       For `LevelDBIterator`, there will call `it.peekNext().getKey()` to get `nextKey`, and `peekNext` method will throw `NoSuchElementException` if `iterator.isValid()` is false.
   
   For `RocksIterator`, if `it.isValid()` if false will call `it.prev()`,  will this operation throw other exceptions? 
   
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] LuciferYang commented on a change in pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
LuciferYang commented on a change in pull request #34913:
URL: https://github.com/apache/spark/pull/34913#discussion_r770235264



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDB.java
##########
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.ref.Reference;
+import java.lang.ref.WeakReference;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.Options;
+import org.rocksdb.Statistics;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+
+import org.apache.spark.annotation.Private;
+
+/**
+ * Implementation of KVStore that uses RocksDB as the underlying data store.
+ */
+@Private
+public class RocksDB implements KVStore {
+
+  static {
+    org.rocksdb.RocksDB.loadLibrary();
+  }
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8);
+
+  private static final BlockBasedTableConfig tableFormatConfig = new BlockBasedTableConfig()

Review comment:
       Does `tableFormatConfig` and `dbOptions` need to be class variables? It seems that they will only be used when initializing `_db`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996427890


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50781/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996385201


   **[Test build #146309 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146309/testReport)** for PR 34913 at commit [`405b84c`](https://github.com/apache/spark/commit/405b84c66ee5bc7154b2ff2583b0b9e90dc00649).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996982126


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50812/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996454733


   @BelodengKlaus 
   > the performance shows that LevelDB is faster than RocksDB. And after Apple Silicon support RocksDB may better than LevelDB ? (maybe only reason is the LevelDB became inactive?)
   
   Yes, currently, both DB doesn't work at all at native Java distribution on Apple Silicon.
   
   At Apache Spark 3.3, we will have both backends and the user can choose one of them for their use cases. The performance tuning will be followed too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996902745


   To @LuciferYang , it's possible, but it's beyond of the scope of this PR.
   To @klaus-xiong , thank you for review. I added `RocksDBTypeInfoSuite` and addressed the indentation part too.
   To @viirya , yes. 
   - We will keep both in Apache Spark 3.3 and the backend will be configurable like `spark.sql.orc.impl`.
   - I'm working on removing them with `ExtendedRocksDBTest` together in Apache Spark 3.3 timeframe.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-996987148


   Thank you all. All tests passed although the UI doesn't show it yet. Merged to master.
   
   <img width="748" alt="Screen Shot 2021-12-17 at 11 37 26 AM" src="https://user-images.githubusercontent.com/9700541/146598745-4f882877-aece-4ff8-94aa-5eacb716758c.png">
   .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun closed pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun closed pull request #34913:
URL: https://github.com/apache/spark/pull/34913


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34913: [SPARK-37655][CORE] Add `RocksDB` Implementation for `KVStore`

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34913:
URL: https://github.com/apache/spark/pull/34913#issuecomment-997018139


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146338/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org