You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/06/06 04:06:39 UTC

[GitHub] [spark] HeartSaVioR commented on a change in pull request #28412: [SPARK-31608][CORE][WEBUI] Add a new type of KVStore to make loading UI faster

HeartSaVioR commented on a change in pull request #28412:
URL: https://github.com/apache/spark/pull/28412#discussion_r436233082



##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and uses
+ * a background thread to dump data to LevelDB once the writing to InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need
+  // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();

Review comment:
       Same here.

##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and uses
+ * a background thread to dump data to LevelDB once the writing to InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need
+  // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.write(value);
+
+    if (store instanceof InMemoryStore) {
+      if (klass.equals(cachedQuantileKlass)) {
+        cachedQuantileQueue.add(value);
+      } else {
+        klassMap.putIfAbsent(klass, true);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.delete(type, naturalKey);
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    KVStore store = getStore();
+    KVStoreView<T> view = store.view(type);
+    return view;
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    KVStore store = getStore();
+    long count = store.count(type);
+    return count;
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws Exception {
+    KVStore store = getStore();

Review comment:
       Same here.

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -128,6 +129,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_))
   private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING)
 
+  private val hybridStoreEnabled = conf.get(History.HYBRID_STORE_ENABLED)
+  private val maxInMemoryStoreUsage = conf.get(History.MAX_IN_MEMORY_STORE_USAGE)
+  private val currentInMemoryStoreUsage = new java.util.concurrent.atomic.AtomicLong(0L)

Review comment:
       Let's have a separate class for memory lease, like HistoryServerDiskManager. This would be pretty much simpler than HistoryServerDiskManager, but avoid dealing with memory calculation in FsHistoryProvider.

##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and uses
+ * a background thread to dump data to LevelDB once the writing to InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need
+  // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>();

Review comment:
       This is the only the kind being stored after replaying, though I think not migrating them is OK, because they can be recomputed. Trade-off between complexity vs possible overhead to recompute if it's being computed once during dumping to level DB.

##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and uses
+ * a background thread to dump data to LevelDB once the writing to InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need
+  // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.write(value);
+
+    if (store instanceof InMemoryStore) {
+      if (klass.equals(cachedQuantileKlass)) {
+        cachedQuantileQueue.add(value);
+      } else {
+        klassMap.putIfAbsent(klass, true);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();

Review comment:
       Same here.

##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and uses
+ * a background thread to dump data to LevelDB once the writing to InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need
+  // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();

Review comment:
       Let's make it shorter via `return getStore().getMetadata(klass);`. We don't do any kind of null check hence no need to let them separate.

##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and uses
+ * a background thread to dump data to LevelDB once the writing to InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need
+  // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.write(value);
+
+    if (store instanceof InMemoryStore) {
+      if (klass.equals(cachedQuantileKlass)) {
+        cachedQuantileQueue.add(value);
+      } else {
+        klassMap.putIfAbsent(klass, true);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.delete(type, naturalKey);
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    KVStore store = getStore();

Review comment:
       Same here.

##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and uses
+ * a background thread to dump data to LevelDB once the writing to InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need
+  // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.write(value);
+
+    if (store instanceof InMemoryStore) {
+      if (klass.equals(cachedQuantileKlass)) {
+        cachedQuantileQueue.add(value);
+      } else {
+        klassMap.putIfAbsent(klass, true);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.delete(type, naturalKey);
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    KVStore store = getStore();
+    KVStoreView<T> view = store.view(type);
+    return view;
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    KVStore store = getStore();

Review comment:
       Same here.

##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and uses
+ * a background thread to dump data to LevelDB once the writing to InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need
+  // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();

Review comment:
       Same here.

##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and uses
+ * a background thread to dump data to LevelDB once the writing to InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need
+  // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();

Review comment:
       Same here.

##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and uses
+ * a background thread to dump data to LevelDB once the writing to InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need
+  // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.write(value);
+
+    if (store instanceof InMemoryStore) {
+      if (klass.equals(cachedQuantileKlass)) {
+        cachedQuantileQueue.add(value);
+      } else {
+        klassMap.putIfAbsent(klass, true);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.delete(type, naturalKey);
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    KVStore store = getStore();
+    KVStoreView<T> view = store.view(type);
+    return view;
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    KVStore store = getStore();
+    long count = store.count(type);
+    return count;
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws Exception {
+    KVStore store = getStore();
+    long count = store.count(type, index, indexedValue);
+    return count;
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      if (backgroundThread != null && backgroundThread.isAlive()) {
+        // The background thread is still running, wait for it to finish
+        backgroundThread.join();
+      }
+      if (levelDB != null) {
+        levelDB.close();
+      }
+      if (inMemoryStore != null) {
+        inMemoryStore.close();
+      }
+    } catch (Exception e) {
+      if (e instanceof IOException) {
+        throw (IOException) e;
+      }
+    }
+  }
+
+  @Override
+  public <T> boolean removeAllByIndexValues(
+      Class<T> klass,
+      String index,
+      Collection<?> indexValues) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("removeAllByIndexValues() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();

Review comment:
       Same here, if the line doesn't exceed the limit.

##########
File path: common/kvstore/src/test/java/org/apache/spark/util/kvstore/HybridStoreSuite.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.*;
+
+public class HybridStoreSuite {
+  private LevelDB db;
+  private File dbpath;
+
+  @After
+  public void cleanup() throws Exception {
+    if (db != null) {
+      db.close();
+    }
+    if (dbpath != null) {
+      FileUtils.deleteQuietly(dbpath);
+    }
+  }
+
+  @Before
+  public void setup() throws Exception {
+    dbpath = File.createTempFile("test.", ".ldb");
+    dbpath.delete();
+    db = new LevelDB(dbpath);
+  }
+
+  @Test
+  public void testMultipleObjectWriteReadDelete() throws Exception {
+    HybridStore store = createHybridStore();
+
+    CustomType1 t1 = createCustomType1(1);
+    CustomType1 t2 = createCustomType1(2);
+
+    try {
+      store.read(t1.getClass(), t1.key);
+      fail("Expected exception for non-existent object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+
+    store.write(t1);
+    store.write(t2);
+
+    assertEquals(t1, store.read(t1.getClass(), t1.key));
+    assertEquals(t2, store.read(t2.getClass(), t2.key));
+    assertEquals(2L, store.count(t1.getClass()));
+
+    store.delete(t2.getClass(), t2.key);
+    assertEquals(1L, store.count(t1.getClass()));
+
+    switchToLevelDB(store);
+    assertEquals(t1, store.read(t1.getClass(), t1.key));
+    try {
+      store.read(t2.getClass(), t2.key);
+      fail("Expected exception for deleted object.");
+    } catch (NoSuchElementException nsee) {
+      // Expected.
+    }
+  }
+
+  @Test
+  public void testUpdate() throws Exception {
+    HybridStore store = createHybridStore();
+
+    CustomType1 t = createCustomType1(1);
+
+    store.write(t);
+
+    t.name = "anotherName";
+
+    store.write(t);
+
+    switchToLevelDB(store);
+    assertEquals(1, store.count(t.getClass()));
+    assertEquals("anotherName", store.read(t.getClass(), t.key).name);
+  }
+
+  @Test
+  public void testArrayIndices() throws Exception {
+    HybridStore store = createHybridStore();
+
+    ArrayKeyIndexType o = new ArrayKeyIndexType();
+    o.key = new int[] { 1, 2 };
+    o.id = new String[] { "3", "4" };
+
+    store.write(o);
+
+    switchToLevelDB(store);
+    assertEquals(o, store.read(ArrayKeyIndexType.class, o.key));
+    assertEquals(o, store.view(ArrayKeyIndexType.class).index("id").first(o.id).iterator().next());
+  }
+
+  @Test
+  public void testBasicIteration() throws Exception {
+    HybridStore store = createHybridStore();
+
+    CustomType1 t1 = createCustomType1(1);
+    store.write(t1);
+
+    CustomType1 t2 = createCustomType1(2);
+    store.write(t2);
+
+    switchToLevelDB(store);
+    assertEquals(t1.id, store.view(t1.getClass()).iterator().next().id);
+    assertEquals(t2.id, store.view(t1.getClass()).skip(1).iterator().next().id);
+    assertEquals(t2.id, store.view(t1.getClass()).skip(1).max(1).iterator().next().id);
+    assertEquals(t1.id,
+      store.view(t1.getClass()).first(t1.key).max(1).iterator().next().id);
+    assertEquals(t2.id,
+      store.view(t1.getClass()).first(t2.key).max(1).iterator().next().id);
+  }
+
+  @Test
+  public void testSkip() throws Exception {
+    HybridStore store = createHybridStore();
+
+    for (int i = 0; i < 10; i++) {
+      store.write(createCustomType1(i));
+    }
+
+    switchToLevelDB(store);
+    KVStoreIterator<CustomType1> it = store.view(CustomType1.class).closeableIterator();
+    assertTrue(it.hasNext());
+    assertTrue(it.skip(5));
+    assertEquals("key5", it.next().key);
+    assertTrue(it.skip(3));
+    assertEquals("key9", it.next().key);
+    assertFalse(it.hasNext());
+  }
+
+  @Test
+  public void testRejectWriting() throws Exception {
+    HybridStore store = createHybridStore();
+    switchToLevelDB(store);
+    try {
+      store.write(createCustomType1(1));
+      fail("Expected exception for writing object after switching to LevelDB");
+    } catch (RuntimeException re) {
+      // Expected.
+    }
+  }
+
+  private HybridStore createHybridStore() {
+    HybridStore store = new HybridStore();
+    store.setLevelDB(db);
+    return store;
+  }
+
+  private void switchToLevelDB(HybridStore store) throws Exception {
+    SwitchingListener listener = new SwitchingListener();
+    store.switchingToLevelDB(listener);
+    assert(listener.waitUntilDone());
+  }
+
+  private class SwitchingListener implements HybridStore.SwitchingToLevelDBListener {
+    private LinkedBlockingQueue<String> results = new LinkedBlockingQueue<>();
+
+    @Override
+    public void onSwitchingToLevelDBSuccess() {
+      try {
+        results.put("Succeed");

Review comment:
       +1 on the suggestion.

##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and uses
+ * a background thread to dump data to LevelDB once the writing to InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need
+  // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.write(value);
+
+    if (store instanceof InMemoryStore) {
+      if (klass.equals(cachedQuantileKlass)) {
+        cachedQuantileQueue.add(value);
+      } else {
+        klassMap.putIfAbsent(klass, true);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.delete(type, naturalKey);
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    KVStore store = getStore();
+    KVStoreView<T> view = store.view(type);
+    return view;
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    KVStore store = getStore();
+    long count = store.count(type);
+    return count;
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws Exception {
+    KVStore store = getStore();
+    long count = store.count(type, index, indexedValue);
+    return count;
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      if (backgroundThread != null && backgroundThread.isAlive()) {
+        // The background thread is still running, wait for it to finish
+        backgroundThread.join();
+      }
+      if (levelDB != null) {
+        levelDB.close();
+      }
+      if (inMemoryStore != null) {
+        inMemoryStore.close();
+      }
+    } catch (Exception e) {
+      if (e instanceof IOException) {
+        throw (IOException) e;
+      }
+    }
+  }
+
+  @Override
+  public <T> boolean removeAllByIndexValues(
+      Class<T> klass,
+      String index,
+      Collection<?> indexValues) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("removeAllByIndexValues() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    boolean removed = store.removeAllByIndexValues(klass, index, indexValues);
+    return removed;
+  }
+
+  public void setLevelDB(LevelDB levelDB) {
+    this.levelDB = levelDB;
+  }
+
+  public void setCachedQuantileKlass(Class<?> klass) {
+    this.cachedQuantileKlass = klass;
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  public void switchingToLevelDB(SwitchingToLevelDBListener listener) throws Exception {
+    // A background thread that dumps data to levelDB
+    backgroundThread = new Thread(new Runnable() {
+      public void run() {
+        Exception exceptionCaught = null;
+
+        try {
+          for (Class<?> klass : klassMap.keySet()) {
+            KVStoreIterator<?> it = inMemoryStore.view(klass).closeableIterator();
+            while (it.hasNext()) {
+              levelDB.write(it.next());
+            }
+          }
+        } catch (Exception e) {
+          e.printStackTrace();

Review comment:
       Please don't print on the sysout/syserr - if the exception is being logged by somewhere then it's fine not to print here.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/History.scala
##########
@@ -195,4 +195,18 @@ private[spark] object History {
       .version("3.0.0")
       .booleanConf
       .createWithDefault(true)
+
+  val HYBRID_STORE_ENABLED = ConfigBuilder("spark.history.store.hybridStore.enabled")
+    .doc("Whether to use HybridStore as the store when parsing event logs. " +
+      "HybridStore will first write data to an in-memory store and having a background thread " +
+      "that dumps data to a disk store after the writing to in-memory store is completed. " +
+      "Use it with caution, as in-memory store requires higher memory usage.")
+    .version("3.1.0")
+    .booleanConf
+    .createWithDefault(true)
+
+  val MAX_IN_MEMORY_STORE_USAGE = ConfigBuilder("spark.history.store.hybridStore.maxMemoryUsage")
+    .version("3.1.0")

Review comment:
       Please add doc for this configuration as well, with specifying default value.

##########
File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/HybridStore.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.kvstore;
+
+import org.apache.spark.annotation.Private;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Collection;
+
+/**
+ * Implementation of KVStore that writes data to InMemoryStore at first and uses
+ * a background thread to dump data to LevelDB once the writing to InMemoryStore
+ * is completed.
+ */
+@Private
+public class HybridStore implements KVStore {
+
+  private InMemoryStore inMemoryStore = new InMemoryStore();
+  private LevelDB levelDB = null;
+
+  // Flag to indicate if we should use inMemoryStore Or levelDB.
+  private AtomicBoolean shouldUseInMemoryStore = new AtomicBoolean(true);
+
+  // A background thread that dumps data in inMemoryStore to levelDB
+  private Thread backgroundThread = null;
+
+  // A hash map that stores all class types (except CachedQuantile) that had been writen
+  // to inMemoryStore.
+  private ConcurrentHashMap<Class<?>, Boolean> klassMap = new ConcurrentHashMap<>();
+
+  // CachedQuantile can be written to kvstore after rebuildAppStore(), so we need
+  // to handle it specially to avoid conflicts. We will use a queue store CachedQuantile
+  // objects when the underlying store is inMemoryStore, and dump these objects to levelDB
+  // before the switch completes.
+  private Class<?> cachedQuantileKlass = null;
+  private ConcurrentLinkedQueue<Object> cachedQuantileQueue = new ConcurrentLinkedQueue<>();
+
+
+  @Override
+  public <T> T getMetadata(Class<T> klass) throws Exception {
+    KVStore store = getStore();
+    T metaData = store.getMetadata(klass);
+    return metaData;
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+    KVStore store = getStore();
+    store.setMetadata(value);
+  }
+
+  @Override
+  public <T> T read(Class<T> klass, Object naturalKey) throws Exception {
+    KVStore store = getStore();
+    T value = store.read(klass, naturalKey);
+    return value;
+  }
+
+  @Override
+  public void write(Object value) throws Exception {
+    Class<?> klass = value.getClass();
+
+    if (backgroundThread != null && !klass.equals(cachedQuantileKlass)) {
+      throw new RuntimeException("write() for objects other than CachedQuantile " +
+        "shouldn't be called after the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.write(value);
+
+    if (store instanceof InMemoryStore) {
+      if (klass.equals(cachedQuantileKlass)) {
+        cachedQuantileQueue.add(value);
+      } else {
+        klassMap.putIfAbsent(klass, true);
+      }
+    }
+  }
+
+  @Override
+  public void delete(Class<?> type, Object naturalKey) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("delete() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    store.delete(type, naturalKey);
+  }
+
+  @Override
+  public <T> KVStoreView<T> view(Class<T> type) throws Exception {
+    KVStore store = getStore();
+    KVStoreView<T> view = store.view(type);
+    return view;
+  }
+
+  @Override
+  public long count(Class<?> type) throws Exception {
+    KVStore store = getStore();
+    long count = store.count(type);
+    return count;
+  }
+
+  @Override
+  public long count(Class<?> type, String index, Object indexedValue) throws Exception {
+    KVStore store = getStore();
+    long count = store.count(type, index, indexedValue);
+    return count;
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      if (backgroundThread != null && backgroundThread.isAlive()) {
+        // The background thread is still running, wait for it to finish
+        backgroundThread.join();
+      }
+      if (levelDB != null) {
+        levelDB.close();
+      }
+      if (inMemoryStore != null) {
+        inMemoryStore.close();
+      }
+    } catch (Exception e) {
+      if (e instanceof IOException) {
+        throw (IOException) e;
+      }
+    }
+  }
+
+  @Override
+  public <T> boolean removeAllByIndexValues(
+      Class<T> klass,
+      String index,
+      Collection<?> indexValues) throws Exception {
+    if (backgroundThread != null) {
+      throw new RuntimeException("removeAllByIndexValues() shouldn't be called after " +
+        "the hybrid store begins switching to levelDB");
+    }
+
+    KVStore store = getStore();
+    boolean removed = store.removeAllByIndexValues(klass, index, indexValues);
+    return removed;
+  }
+
+  public void setLevelDB(LevelDB levelDB) {
+    this.levelDB = levelDB;
+  }
+
+  public void setCachedQuantileKlass(Class<?> klass) {
+    this.cachedQuantileKlass = klass;
+  }
+
+  /**
+   * This method is called when the writing is done for inMemoryStore. A
+   * background thread will be created and be started to dump data in inMemoryStore
+   * to levelDB. Once the dumping is completed, the underlying kvstore will be
+   * switched to levelDB.
+   */
+  public void switchingToLevelDB(SwitchingToLevelDBListener listener) throws Exception {
+    // A background thread that dumps data to levelDB
+    backgroundThread = new Thread(new Runnable() {
+      public void run() {
+        Exception exceptionCaught = null;
+
+        try {
+          for (Class<?> klass : klassMap.keySet()) {
+            KVStoreIterator<?> it = inMemoryStore.view(klass).closeableIterator();
+            while (it.hasNext()) {
+              levelDB.write(it.next());
+            }
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+          exceptionCaught = e;
+        }
+
+        if (exceptionCaught == null) {
+          // Switch to levelDB and close inMemoryStore
+          shouldUseInMemoryStore.set(false);
+
+          // Dump CachedQuantile objects to levelDB
+          try {
+            while(cachedQuantileQueue.size() > 0) {
+              levelDB.write(cachedQuantileQueue.poll());
+            }
+          } catch (Exception e) {
+            e.printStackTrace();

Review comment:
       Same here. And isn't the exception part of failure on switching, which shouldn't be swallowed?

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -1167,6 +1172,17 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     // At this point the disk data either does not exist or was deleted because it failed to
     // load, so the event log needs to be replayed.
 
+    // If hybrid store is enabled, try it first.

Review comment:
       The retrial below is due to the compaction operation being held concurrently. HybridStore should also follow the retrial logic as well.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/History.scala
##########
@@ -195,4 +195,18 @@ private[spark] object History {
       .version("3.0.0")
       .booleanConf
       .createWithDefault(true)
+
+  val HYBRID_STORE_ENABLED = ConfigBuilder("spark.history.store.hybridStore.enabled")
+    .doc("Whether to use HybridStore as the store when parsing event logs. " +
+      "HybridStore will first write data to an in-memory store and having a background thread " +
+      "that dumps data to a disk store after the writing to in-memory store is completed. " +
+      "Use it with caution, as in-memory store requires higher memory usage.")

Review comment:
       I think it doesn't need to be a kind of caution, but it should mention the below configuration and how it affects the usage of the heap memory.

##########
File path: core/src/main/scala/org/apache/spark/internal/config/History.scala
##########
@@ -195,4 +195,18 @@ private[spark] object History {
       .version("3.0.0")
       .booleanConf
       .createWithDefault(true)
+
+  val HYBRID_STORE_ENABLED = ConfigBuilder("spark.history.store.hybridStore.enabled")
+    .doc("Whether to use HybridStore as the store when parsing event logs. " +
+      "HybridStore will first write data to an in-memory store and having a background thread " +
+      "that dumps data to a disk store after the writing to in-memory store is completed. " +
+      "Use it with caution, as in-memory store requires higher memory usage.")
+    .version("3.1.0")
+    .booleanConf
+    .createWithDefault(true)

Review comment:
       Please set it false by default, as it consumes more memory (max 2g) and end users may not be noticed by the change and encounter OOM.

##########
File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
##########
@@ -1197,6 +1213,71 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     KVUtils.open(newStorePath, metadata)
   }
 
+  private def createHybridStore(
+      dm: HistoryServerDiskManager,
+      appId: String,
+      attempt: AttemptInfoWrapper,
+      metadata: AppStatusStoreMetadata): KVStore = {
+
+    val reader = EventLogFileReader(fs, new Path(logDir, attempt.logPath),
+      attempt.lastIndex)
+    val isCompressed = reader.compressionCodec.isDefined
+
+    val memoryUsage = approximateMemoryUsage(reader.totalSize, isCompressed)
+    if (currentInMemoryStoreUsage.get + memoryUsage > maxInMemoryStoreUsage) {
+      throw new IllegalStateException("Not enough in-memory storage to create hybrid store.")
+    }
+    currentInMemoryStoreUsage.addAndGet(memoryUsage)
+    logInfo(s"Attempt creating hybrid store to parse $appId / ${attempt.info.attemptId}. " +
+      s"Requested ${Utils.bytesToString(memoryUsage)} in-memory storage quota.")
+
+    logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...")
+    val lease = dm.lease(reader.totalSize, isCompressed)
+    val isLeaseRolledBack = new java.util.concurrent.atomic.AtomicBoolean(false)
+    var store: HybridStore = null
+    try {
+      store = new HybridStore()
+      val levelDB = KVUtils.open(lease.tmpPath, metadata)
+      store.setLevelDB(levelDB)
+      store.setCachedQuantileKlass(classOf[CachedQuantile])

Review comment:
       Since HybridStore depends on the logic in AppStatusStore (hence specific to SHS), I think it's not required for HybridStore to be placed in common-kvstore module. HybridStore can be moved to core module (this kind of workaround is not needed then) and there's another chance you can simplify the code (+ test code) via implementing to Scala.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org