You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/05/29 06:57:24 UTC
svn commit: r1682365 - in /pig/branches/spark:
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
test/org/apache/pig/spark/TestIndexedKey.java test/spark-local-tests
Author: xuefu
Date: Fri May 29 04:57:24 2015
New Revision: 1682365
URL: http://svn.apache.org/r1682365
Log:
PIG-4284: Enable unit test TestJoin for spark (Liyun via Xuefu)
Added:
pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
pig/branches/spark/test/spark-local-tests
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1682365&r1=1682364&r2=1682365&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java Fri May 29 04:57:24 2015
@@ -91,10 +91,10 @@ public class GlobalRearrangeConverter im
JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(physicalOperator));
return jrdd2.rdd();
} else {
- List<RDD<Tuple2<Object, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<Object, Tuple>>>();
+ List<RDD<Tuple2<IndexedKey, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
for (RDD<Tuple> rdd : predecessors) {
JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
- JavaRDD<Tuple2<Object, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
+ JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
rddPairs.add(rddPair.rdd());
}
@@ -105,8 +105,8 @@ public class GlobalRearrangeConverter im
.asScalaBuffer(rddPairs).toSeq()),
new HashPartitioner(parallelism));
- RDD<Tuple2<Object, Seq<Seq<Tuple>>>> rdd =
- (RDD<Tuple2<Object, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
+ RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
+ (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
}
}
@@ -289,20 +289,134 @@ public class GlobalRearrangeConverter im
}
}
+ /**
+ * IndexedKey records the index and key info of a tuple.
+ */
+ public static class IndexedKey implements Serializable {
+ private byte index;
+ private Object key;
+
+ public IndexedKey(byte index, Object key) {
+ this.index = index;
+ this.key = key;
+ }
+
+ public Object getKey() {
+ return key;
+ }
+
+ @Override
+ public String toString() {
+ return "IndexedKey{" +
+ "index=" + index +
+ ", key=" + key +
+ '}';
+ }
+
+ /**
+ * If key is empty, we'd like compute equality based on key and index.
+ * If key is not empty, we'd like to compute equality based on just the key (like we normally do).
+ * There are two possible cases when two tuples are compared:
+ * 1) Compare tuples of same table (same index)
+ * 2) Compare tuples of different tables (different index values)
+ * In 1)
+ * key1 key2 equal?
+ * null null Y
+ * foo null N
+ * null foo N
+ * foo foo Y
+ * (1,1) (1,1) Y
+ * (1,) (1,) Y
+ * (1,2) (1,2) Y
+
+ *
+ * In 2)
+ * key1 key2 equal?
+ * null null N
+ * foo null N
+ * null foo N
+ * foo foo Y
+ * (1,1) (1,1) Y
+ * (1,) (1,) N
+ * (1,2) (1,2) Y
+
+ *
+ * @param o
+ * @return
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ IndexedKey that = (IndexedKey) o;
+ if (index == that.index) {
+ if (key == null && that.key == null) {
+ return true;
+ } else if (key == null || that.key == null) {
+ return false;
+ } else{
+ return key.equals(that.key);
+ }
+ } else {
+ if (key == null || that.key == null) {
+ return false;
+ } else if (key.equals(that.key) && !containNullfields(key)) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ private boolean containNullfields(Object key) {
+ if (key instanceof Tuple) {
+ for (int i = 0; i < ((Tuple) key).size(); i++) {
+ try {
+ if (((Tuple) key).get(i) == null) {
+ return true;
+ }
+ } catch (ExecException e) {
+ throw new RuntimeException("exception found in " +
+ "containNullfields", e);
+
+ }
+ }
+ }
+ return false;
+
+ }
+
+ /**
+ * Calculate hashCode by index and key
+ * if key is empty, return index value
+ * if key is not empty, return the key.hashCode()
+ */
+ @Override
+ public int hashCode() {
+ int result = 0;
+ if (key == null) {
+ result = (int) index;
+ }else {
+ result = key.hashCode();
+ }
+ return result;
+ }
+ }
+
private static class ToKeyValueFunction implements
- Function<Tuple, Tuple2<Object, Tuple>>, Serializable {
+ Function<Tuple, Tuple2<IndexedKey, Tuple>>, Serializable {
@Override
- public Tuple2<Object, Tuple> call(Tuple t) {
+ public Tuple2<IndexedKey, Tuple> call(Tuple t) {
try {
// (index, key, value)
if (LOG.isDebugEnabled()) {
LOG.debug("ToKeyValueFunction in " + t);
}
- Object key = t.get(1);
+ IndexedKey indexedKey = new IndexedKey((Byte) t.get(0), t.get(1));
Tuple value = (Tuple) t.get(2); // value
// (key, value)
- Tuple2<Object, Tuple> out = new Tuple2<Object, Tuple>(key,
+ Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
value);
if (LOG.isDebugEnabled()) {
LOG.debug("ToKeyValueFunction out " + out);
@@ -315,15 +429,15 @@ public class GlobalRearrangeConverter im
}
private static class ToGroupKeyValueFunction implements
- Function<Tuple2<Object, Seq<Seq<Tuple>>>, Tuple>, Serializable {
+ Function<Tuple2<IndexedKey, Seq<Seq<Tuple>>>, Tuple>, Serializable {
@Override
- public Tuple call(Tuple2<Object, Seq<Seq<Tuple>>> input) {
+ public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("ToGroupKeyValueFunction2 in " + input);
}
- final Object key = input._1();
+ final Object key = input._1().getKey();
Object obj = input._2();
// XXX this is a hack for Spark 1.1.0: the type is an Array, not Seq
Seq<Tuple>[] bags = (Seq<Tuple>[])obj;
Added: pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java?rev=1682365&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java (added)
+++ pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java Fri May 29 04:57:24 2015
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.pig.spark;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import org.apache.pig.backend.hadoop.executionengine.spark.converter
+ .GlobalRearrangeConverter;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(JUnit4.class)
+public class TestIndexedKey {
+
+ /**Case1:Compare IndexedKeys with same index value
+ * key1 key2 equal? hashCode1 hashCode2
+ * foo null N hashCode(foo) index
+ * null foo N index hashCode(foo)
+ * foo foo Y hashCode(foo) hashCode(foo)
+ * null null Y index index
+ * (1,1) (1,1) Y hashCode((1,1)) hashCode((1,1))
+ * (1,) (1,) Y hashCode((1,)) hashCode((1,))
+ * (1,1) (1,2) N hashCode((1,1)) hashCode((1,2))
+ */
+ @Test
+ public void testIndexedKeyWithSameIndexValue() throws Exception {
+ GlobalRearrangeConverter.IndexedKey a0 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),"foo");
+ GlobalRearrangeConverter.IndexedKey a1 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),null);
+ assertEquals(a0.equals(a1), false);
+ assertEquals(a0.hashCode()==a1.hashCode(),false);
+
+ GlobalRearrangeConverter.IndexedKey a2 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),null);
+ GlobalRearrangeConverter.IndexedKey a3 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),"foo");
+ assertEquals(a2.equals(a3),false);
+ assertEquals(a2.hashCode()==a3.hashCode(),false);
+
+ GlobalRearrangeConverter.IndexedKey a4 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),"foo");
+ GlobalRearrangeConverter.IndexedKey a5 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),"foo");
+ assertEquals(a4.equals(a5),true);
+ assertEquals(a4.hashCode()==a5.hashCode(),true);
+
+ GlobalRearrangeConverter.IndexedKey a6 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),null);
+ GlobalRearrangeConverter.IndexedKey a7 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),null);
+ assertEquals(a6.equals(a7),true);
+ assertEquals(a6.hashCode()==a7.hashCode(),true);
+
+ Tuple t1 = TupleFactory.getInstance().newTuple(2);
+ t1.set(0,"1");
+ t1.set(1,"1");
+ Tuple t2 = TupleFactory.getInstance().newTuple(2);
+ t2.set(0,"1");
+ t2.set(1,"1");
+ GlobalRearrangeConverter.IndexedKey a8 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),t1);
+ GlobalRearrangeConverter.IndexedKey a9 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),t2);
+ assertEquals(a8.equals(a9),true);
+ assertEquals(a8.hashCode()==a9.hashCode(),true);
+
+ Tuple t3 = TupleFactory.getInstance().newTuple(2);
+ t3.set(0,"1");
+ t3.set(1,null);
+ Tuple t4 = TupleFactory.getInstance().newTuple(2);
+ t4.set(0,"1");
+ t4.set(1,null);
+ GlobalRearrangeConverter.IndexedKey a10 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),t3);
+ GlobalRearrangeConverter.IndexedKey a11 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),t4);
+ assertEquals(a10.equals(a11),true);
+ assertEquals(a10.hashCode()==a11.hashCode(),true);
+
+ Tuple t5 = TupleFactory.getInstance().newTuple(2);
+ t5.set(0,"1");
+ t5.set(1,"1");
+ Tuple t6 = TupleFactory.getInstance().newTuple(2);
+ t6.set(0,"1");
+ t6.set(1,"2");
+ GlobalRearrangeConverter.IndexedKey a12 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),t5);
+ GlobalRearrangeConverter.IndexedKey a13 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),t6);
+ assertEquals(a12.equals(a13),false);
+ assertEquals(a12.hashCode()==a13.hashCode(),false);
+ }
+
+ /*
+ * Case2:Compare IndexedKeys with different index value
+ * key1 key2 equal? hashCode1 hashCode2
+ * foo null N hashCode(foo) index2
+ * null foo N index1 hashCode(foo)
+ * foo foo Y hashCode(foo) hashCode(foo)
+ * null null N index1 index2
+ * (1,1) (1,1) Y hashCode((1,1)) hashCode((1,1))
+ * (1,) (1,) N hashCode((1,)) hashCode((1,))
+ * (1,1) (1,2) N hashCode((1,1)) hashCode((1,2))
+ */
+ @Test
+ public void testIndexedKeyWithDifferentIndexValue() throws Exception {
+ GlobalRearrangeConverter.IndexedKey a0 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"), "foo");
+ GlobalRearrangeConverter.IndexedKey a1 = new GlobalRearrangeConverter.IndexedKey(new Byte("1"), null);
+ assertEquals(a0.equals(a1), false);
+ assertEquals(a0.hashCode() == a1.hashCode(), false);
+
+ GlobalRearrangeConverter.IndexedKey a2 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"), null);
+ GlobalRearrangeConverter.IndexedKey a3 = new GlobalRearrangeConverter.IndexedKey(new Byte("1"), "foo");
+ assertEquals(a2.equals(a3), false);
+ assertEquals(a2.hashCode() == a3.hashCode(), false);
+
+ GlobalRearrangeConverter.IndexedKey a4 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"), "foo");
+ GlobalRearrangeConverter.IndexedKey a5 = new GlobalRearrangeConverter.IndexedKey(new Byte("1"), "foo");
+ assertEquals(a4.equals(a5), true);
+ assertEquals(a4.hashCode() == a5.hashCode(), true);
+
+ GlobalRearrangeConverter.IndexedKey a6 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"), null);
+ GlobalRearrangeConverter.IndexedKey a7 = new GlobalRearrangeConverter.IndexedKey(new Byte("1"), null);
+ assertEquals(a6.equals(a7), false);
+ assertEquals(a6.hashCode() == a7.hashCode(), false);
+
+ Tuple t1 = TupleFactory.getInstance().newTuple(2);
+ t1.set(0, "1");
+ t1.set(1, "1");
+ Tuple t2 = TupleFactory.getInstance().newTuple(2);
+ t2.set(0, "1");
+ t2.set(1, "1");
+ GlobalRearrangeConverter.IndexedKey a8 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"), t1);
+ GlobalRearrangeConverter.IndexedKey a9 = new GlobalRearrangeConverter.IndexedKey(new Byte("1"), t2);
+ assertEquals(a8.equals(a9), true);
+ assertEquals(a8.hashCode() == a9.hashCode(), true);
+
+ Tuple t3 = TupleFactory.getInstance().newTuple(2);
+ t3.set(0, "1");
+ t3.set(1, null);
+ Tuple t4 = TupleFactory.getInstance().newTuple(2);
+ t4.set(0, "1");
+ t4.set(1, null);
+ GlobalRearrangeConverter.IndexedKey a10 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"), t3);
+ GlobalRearrangeConverter.IndexedKey a11 = new GlobalRearrangeConverter.IndexedKey(new Byte("1"), t4);
+ assertEquals(a10.equals(a11), false);
+ assertEquals(a10.hashCode() == a11.hashCode(), true); //hashcode of a10 and a11 are equal but they are not equal
+
+ Tuple t5 = TupleFactory.getInstance().newTuple(2);
+ t5.set(0, "1");
+ t5.set(1, "1");
+ Tuple t6 = TupleFactory.getInstance().newTuple(2);
+ t6.set(0, "1");
+ t6.set(1, "2");
+ GlobalRearrangeConverter.IndexedKey a12 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"), t5);
+ GlobalRearrangeConverter.IndexedKey a13 = new GlobalRearrangeConverter.IndexedKey(new Byte("1"), t6);
+ assertEquals(a12.equals(a13), false);
+ assertEquals(a12.hashCode() == a13.hashCode(), false);
+ }
+}
Modified: pig/branches/spark/test/spark-local-tests
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/spark-local-tests?rev=1682365&r1=1682364&r2=1682365&view=diff
==============================================================================
--- pig/branches/spark/test/spark-local-tests (original)
+++ pig/branches/spark/test/spark-local-tests Fri May 29 04:57:24 2015
@@ -82,3 +82,4 @@
**/TestStoreLocal.java
**/TestMultiQueryLocal.java
**/TestUnionOnSchema.java
+**/TestIndexedKey.java