You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2018/01/19 19:32:32 UTC
spark git commit: [SPARK-23103][CORE] Ensure correct sort order for
negative values in LevelDB.
Repository: spark
Updated Branches:
refs/heads/master fed2139f0 -> aa3a1276f
[SPARK-23103][CORE] Ensure correct sort order for negative values in LevelDB.
The code was sorting "0" as "less than" negative values, which is a little
wrong. Fix is simple, most of the changes are the added test and related
cleanup.
Author: Marcelo Vanzin <va...@cloudera.com>
Closes #20284 from vanzin/SPARK-23103.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa3a1276
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa3a1276
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa3a1276
Branch: refs/heads/master
Commit: aa3a1276f9e23ffbb093d00743e63cd4369f9f57
Parents: fed2139
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri Jan 19 13:32:20 2018 -0600
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Fri Jan 19 13:32:20 2018 -0600
----------------------------------------------------------------------
.../spark/util/kvstore/LevelDBTypeInfo.java | 2 +-
.../spark/util/kvstore/DBIteratorSuite.java | 7 +-
.../apache/spark/util/kvstore/LevelDBSuite.java | 77 ++++++++++----------
.../spark/status/AppStatusListenerSuite.scala | 8 +-
4 files changed, 52 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a1276/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
index 232ee41..f4d3592 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java
@@ -493,7 +493,7 @@ class LevelDBTypeInfo {
byte[] key = new byte[bytes * 2 + 2];
long longValue = ((Number) value).longValue();
key[0] = prefix;
- key[1] = longValue > 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;
+ key[1] = longValue >= 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;
for (int i = 0; i < key.length - 2; i++) {
int masked = (int) ((longValue >>> (4 * i)) & 0xF);
http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a1276/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
index 9a81f86..1e06243 100644
--- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
+++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/DBIteratorSuite.java
@@ -73,7 +73,9 @@ public abstract class DBIteratorSuite {
private static final BaseComparator NATURAL_ORDER = (t1, t2) -> t1.key.compareTo(t2.key);
private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> t1.id.compareTo(t2.id);
private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> t1.name.compareTo(t2.name);
- private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> t1.num - t2.num;
+ private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> {
+ return Integer.valueOf(t1.num).compareTo(t2.num);
+ };
private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> t1.child.compareTo(t2.child);
/**
@@ -112,7 +114,8 @@ public abstract class DBIteratorSuite {
t.key = "key" + i;
t.id = "id" + i;
t.name = "name" + RND.nextInt(MAX_ENTRIES);
- t.num = RND.nextInt(MAX_ENTRIES);
+ // Force one item to have an integer value of zero to test the fix for SPARK-23103.
+ t.num = (i != 0) ? (int) RND.nextLong() : 0;
t.child = "child" + (i % MIN_ENTRIES);
allEntries.add(t);
}
http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a1276/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
index 2b07d24..b8123ac 100644
--- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
+++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java
@@ -21,6 +21,8 @@ import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.commons.io.FileUtils;
import org.iq80.leveldb.DBIterator;
@@ -74,11 +76,7 @@ public class LevelDBSuite {
@Test
public void testObjectWriteReadDelete() throws Exception {
- CustomType1 t = new CustomType1();
- t.key = "key";
- t.id = "id";
- t.name = "name";
- t.child = "child";
+ CustomType1 t = createCustomType1(1);
try {
db.read(CustomType1.class, t.key);
@@ -106,17 +104,9 @@ public class LevelDBSuite {
@Test
public void testMultipleObjectWriteReadDelete() throws Exception {
- CustomType1 t1 = new CustomType1();
- t1.key = "key1";
- t1.id = "id";
- t1.name = "name1";
- t1.child = "child1";
-
- CustomType1 t2 = new CustomType1();
- t2.key = "key2";
- t2.id = "id";
- t2.name = "name2";
- t2.child = "child2";
+ CustomType1 t1 = createCustomType1(1);
+ CustomType1 t2 = createCustomType1(2);
+ t2.id = t1.id;
db.write(t1);
db.write(t2);
@@ -142,11 +132,7 @@ public class LevelDBSuite {
@Test
public void testMultipleTypesWriteReadDelete() throws Exception {
- CustomType1 t1 = new CustomType1();
- t1.key = "1";
- t1.id = "id";
- t1.name = "name1";
- t1.child = "child1";
+ CustomType1 t1 = createCustomType1(1);
IntKeyType t2 = new IntKeyType();
t2.key = 2;
@@ -188,10 +174,7 @@ public class LevelDBSuite {
public void testMetadata() throws Exception {
assertNull(db.getMetadata(CustomType1.class));
- CustomType1 t = new CustomType1();
- t.id = "id";
- t.name = "name";
- t.child = "child";
+ CustomType1 t = createCustomType1(1);
db.setMetadata(t);
assertEquals(t, db.getMetadata(CustomType1.class));
@@ -202,11 +185,7 @@ public class LevelDBSuite {
@Test
public void testUpdate() throws Exception {
- CustomType1 t = new CustomType1();
- t.key = "key";
- t.id = "id";
- t.name = "name";
- t.child = "child";
+ CustomType1 t = createCustomType1(1);
db.write(t);
@@ -222,13 +201,7 @@ public class LevelDBSuite {
@Test
public void testSkip() throws Exception {
for (int i = 0; i < 10; i++) {
- CustomType1 t = new CustomType1();
- t.key = "key" + i;
- t.id = "id" + i;
- t.name = "name" + i;
- t.child = "child" + i;
-
- db.write(t);
+ db.write(createCustomType1(i));
}
KVStoreIterator<CustomType1> it = db.view(CustomType1.class).closeableIterator();
@@ -240,6 +213,36 @@ public class LevelDBSuite {
assertFalse(it.hasNext());
}
+ @Test
+ public void testNegativeIndexValues() throws Exception {
+ List<Integer> expected = Arrays.asList(-100, -50, 0, 50, 100);
+
+ expected.stream().forEach(i -> {
+ try {
+ db.write(createCustomType1(i));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ List<Integer> results = StreamSupport
+ .stream(db.view(CustomType1.class).index("int").spliterator(), false)
+ .map(e -> e.num)
+ .collect(Collectors.toList());
+
+ assertEquals(expected, results);
+ }
+
+ private CustomType1 createCustomType1(int i) {
+ CustomType1 t = new CustomType1();
+ t.key = "key" + i;
+ t.id = "id" + i;
+ t.name = "name" + i;
+ t.num = i;
+ t.child = "child" + i;
+ return t;
+ }
+
private int countKeys(Class<?> type) throws Exception {
byte[] prefix = db.getTypeInfo(type).keyPrefix();
int count = 0;
http://git-wip-us.apache.org/repos/asf/spark/blob/aa3a1276/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index ca66b6b..e7981be 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -894,15 +894,19 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
val dropped = stages.drop(1).head
// Cache some quantiles by calling AppStatusStore.taskSummary(). For quantiles to be
- // calculcated, we need at least one finished task.
+ // calculated, we need at least one finished task. The code in AppStatusStore uses
+ // `executorRunTime` to detect valid tasks, so that metric needs to be updated in the
+ // task end event.
time += 1
val task = createTasks(1, Array("1")).head
listener.onTaskStart(SparkListenerTaskStart(dropped.stageId, dropped.attemptId, task))
time += 1
task.markFinished(TaskState.FINISHED, time)
+ val metrics = TaskMetrics.empty
+ metrics.setExecutorRunTime(42L)
listener.onTaskEnd(SparkListenerTaskEnd(dropped.stageId, dropped.attemptId,
- "taskType", Success, task, null))
+ "taskType", Success, task, metrics))
new AppStatusStore(store)
.taskSummary(dropped.stageId, dropped.attemptId, Array(0.25d, 0.50d, 0.75d))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org