You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/05/15 23:56:10 UTC

[GitHub] [spark] vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads

vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix performance of ElementTrackingStore deletes when using InMemoryStore under high loads
URL: https://github.com/apache/spark/pull/24616#discussion_r284491196
 
 

 ##########
 File path: common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java
 ##########
 @@ -126,64 +132,144 @@ public void close() {
     return (Comparable<Object>) in;
   }
 
-  private static class InstanceList {
+  @SuppressWarnings("unchecked")
+  private static <T> KVStoreView<T> emptyView() {
+    return (InMemoryView<T>) InMemoryView.EMPTY_VIEW;
+  }
+
+  /**
+   * Encapsulates ConcurrentHashMap so that the typing in and out of the map strictly maps a
+   * class of type T to an InstanceList of type T.
+   */
+  private static class InMemoryLists {
+    private ConcurrentMap<Class<?>, InstanceList<?>> data = new ConcurrentHashMap<>();
+
+    @SuppressWarnings("unchecked")
+    public <T> InstanceList<T> get(Class<T> type) {
+      return (InstanceList<T>)data.get(type);
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> void write(T value) throws Exception {
+      InstanceList<T> list =
+        (InstanceList<T>) data.computeIfAbsent(value.getClass(), InstanceList::new);
+      list.put(value);
+    }
+
+    public void clear() {
+      data.clear();
+    }
+  }
+
+  private static class InstanceList<T> {
+
+    private static class CountingRemoveIfForEach<T> implements BiConsumer<Comparable<Object>, T> {
+      ConcurrentMap<Comparable<Object>, T> data;
+      Predicate<? super T> filter;
+      int count = 0;
+
+      CountingRemoveIfForEach(
+          ConcurrentMap<Comparable<Object>, T> data,
+          Predicate<? super T> filter) {
+        this.data = data;
+        this.filter = filter;
+      }
+
+      public void accept(Comparable<Object> key, T value) {
+        // To address https://bugs.openjdk.java.net/browse/JDK-8078645 which affects remove() on
 
 Review comment:
   This comment sounds like it belongs at this class's declaration, not inside this method?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org