You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2018/01/19 19:32:32 UTC

spark git commit: [SPARK-23103][CORE] Ensure correct sort order for negative values in LevelDB.

Repository: spark
Updated Branches:
  refs/heads/master fed2139f0 -> aa3a1276f


[SPARK-23103][CORE] Ensure correct sort order for negative values in LevelDB.

The code was sorting "0" as "less than" negative values, which is a little
wrong. Fix is simple, most of the changes are the added test and related
cleanup.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #20284 from vanzin/SPARK-23103.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa3a1276
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa3a1276
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa3a1276

Branch: refs/heads/master
Commit: aa3a1276f9e23ffbb093d00743e63cd4369f9f57
Parents: fed2139
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Jan 19 13:32:20 2018 -0600
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Fri Jan 19 13:32:20 2018 -0600

----------------------------------------------------------------------
 .../spark/util/kvstore/LevelDBTypeInfo.java     |  2 +-
 .../spark/util/kvstore/DBIteratorSuite.java     |  7 +-
 .../apache/spark/util/kvstore/LevelDBSuite.java | 77 ++++++++++----------
 .../spark/status/AppStatusListenerSuite.scala   |  8 +-
 4 files changed, 52 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a1276/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
index 232ee41..f4d3592 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
@@ -493,7 +493,7 @@ class LevelDBTypeInfo {
         byte[] key = new byte[bytes * 2 + 2];
         long longValue = ((Number) value).longValue();
         key[0] = prefix;
-        key[1] = longValue > 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;
+        key[1] = longValue >= 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;
 
         for (int i = 0; i < key.length - 2; i++) {
           int masked = (int) ((longValue >>> (4 * i)) & 0xF);

http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a1276/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
index 9a81f86..1e06243 100644
--- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
+++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
@@ -73,7 +73,9 @@ public abstract class DBIteratorSuite {
   private static final BaseComparator NATURAL_ORDER = (t1, t2) -> t1.key.compareTo(t2.key);
   private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> t1.id.compareTo(t2.id);
   private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> t1.name.compareTo(t2.name);
-  private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> t1.num - t2.num;
+  private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> {
+    return Integer.valueOf(t1.num).compareTo(t2.num);
+  };
   private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> t1.child.compareTo(t2.child);
 
   /**
@@ -112,7 +114,8 @@ public abstract class DBIteratorSuite {
       t.key = "key" + i;
       t.id = "id" + i;
       t.name = "name" + RND.nextInt(MAX_ENTRIES);
-      t.num = RND.nextInt(MAX_ENTRIES);
+      // Force one item to have an integer value of zero to test the fix for SPARK-23103.
+      t.num = (i != 0) ? (int) RND.nextLong() : 0;
       t.child = "child" + (i % MIN_ENTRIES);
       allEntries.add(t);
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a1276/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
index 2b07d24..b8123ac 100644
--- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
+++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
@@ -21,6 +21,8 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import org.apache.commons.io.FileUtils;
 import org.iq80.leveldb.DBIterator;
@@ -74,11 +76,7 @@ public class LevelDBSuite {
 
   @Test
   public void testObjectWriteReadDelete() throws Exception {
-    CustomType1 t = new CustomType1();
-    t.key = "key";
-    t.id = "id";
-    t.name = "name";
-    t.child = "child";
+    CustomType1 t = createCustomType1(1);
 
     try {
       db.read(CustomType1.class, t.key);
@@ -106,17 +104,9 @@ public class LevelDBSuite {
 
   @Test
   public void testMultipleObjectWriteReadDelete() throws Exception {
-    CustomType1 t1 = new CustomType1();
-    t1.key = "key1";
-    t1.id = "id";
-    t1.name = "name1";
-    t1.child = "child1";
-
-    CustomType1 t2 = new CustomType1();
-    t2.key = "key2";
-    t2.id = "id";
-    t2.name = "name2";
-    t2.child = "child2";
+    CustomType1 t1 = createCustomType1(1);
+    CustomType1 t2 = createCustomType1(2);
+    t2.id = t1.id;
 
     db.write(t1);
     db.write(t2);
@@ -142,11 +132,7 @@ public class LevelDBSuite {
 
   @Test
   public void testMultipleTypesWriteReadDelete() throws Exception {
-    CustomType1 t1 = new CustomType1();
-    t1.key = "1";
-    t1.id = "id";
-    t1.name = "name1";
-    t1.child = "child1";
+    CustomType1 t1 = createCustomType1(1);
 
     IntKeyType t2 = new IntKeyType();
     t2.key = 2;
@@ -188,10 +174,7 @@ public class LevelDBSuite {
   public void testMetadata() throws Exception {
     assertNull(db.getMetadata(CustomType1.class));
 
-    CustomType1 t = new CustomType1();
-    t.id = "id";
-    t.name = "name";
-    t.child = "child";
+    CustomType1 t = createCustomType1(1);
 
     db.setMetadata(t);
     assertEquals(t, db.getMetadata(CustomType1.class));
@@ -202,11 +185,7 @@ public class LevelDBSuite {
 
   @Test
   public void testUpdate() throws Exception {
-    CustomType1 t = new CustomType1();
-    t.key = "key";
-    t.id = "id";
-    t.name = "name";
-    t.child = "child";
+    CustomType1 t = createCustomType1(1);
 
     db.write(t);
 
@@ -222,13 +201,7 @@ public class LevelDBSuite {
   @Test
   public void testSkip() throws Exception {
     for (int i = 0; i < 10; i++) {
-      CustomType1 t = new CustomType1();
-      t.key = "key" + i;
-      t.id = "id" + i;
-      t.name = "name" + i;
-      t.child = "child" + i;
-
-      db.write(t);
+      db.write(createCustomType1(i));
     }
 
     KVStoreIterator<CustomType1> it = db.view(CustomType1.class).closeableIterator();
@@ -240,6 +213,36 @@ public class LevelDBSuite {
     assertFalse(it.hasNext());
   }
 
+  @Test
+  public void testNegativeIndexValues() throws Exception {
+    List<Integer> expected = Arrays.asList(-100, -50, 0, 50, 100);
+
+    expected.stream().forEach(i -> {
+      try {
+        db.write(createCustomType1(i));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+
+    List<Integer> results = StreamSupport
+      .stream(db.view(CustomType1.class).index("int").spliterator(), false)
+      .map(e -> e.num)
+      .collect(Collectors.toList());
+
+    assertEquals(expected, results);
+  }
+
+  private CustomType1 createCustomType1(int i) {
+    CustomType1 t = new CustomType1();
+    t.key = "key" + i;
+    t.id = "id" + i;
+    t.name = "name" + i;
+    t.num = i;
+    t.child = "child" + i;
+    return t;
+  }
+
   private int countKeys(Class<?> type) throws Exception {
     byte[] prefix = db.getTypeInfo(type).keyPrefix();
     int count = 0;

http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a1276/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index ca66b6b..e7981be 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -894,15 +894,19 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     val dropped = stages.drop(1).head
 
     // Cache some quantiles by calling AppStatusStore.taskSummary(). For quantiles to be
-    // calculcated, we need at least one finished task.
+    // calculated, we need at least one finished task. The code in AppStatusStore uses
+    // `executorRunTime` to detect valid tasks, so that metric needs to be updated in the
+    // task end event.
     time += 1
     val task = createTasks(1, Array("1")).head
     listener.onTaskStart(SparkListenerTaskStart(dropped.stageId, dropped.attemptId, task))
 
     time += 1
     task.markFinished(TaskState.FINISHED, time)
+    val metrics = TaskMetrics.empty
+    metrics.setExecutorRunTime(42L)
     listener.onTaskEnd(SparkListenerTaskEnd(dropped.stageId, dropped.attemptId,
-      "taskType", Success, task, null))
+      "taskType", Success, task, metrics))
 
     new AppStatusStore(store)
       .taskSummary(dropped.stageId, dropped.attemptId, Array(0.25d, 0.50d, 0.75d))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org