You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/05/29 06:57:24 UTC

svn commit: r1682365 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java test/org/apache/pig/spark/TestIndexedKey.java test/spark-local-tests

Author: xuefu
Date: Fri May 29 04:57:24 2015
New Revision: 1682365

URL: http://svn.apache.org/r1682365
Log:
PIG-4284: Enable unit test TestJoin for spark (Liyun via Xuefu)

Added:
    pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
    pig/branches/spark/test/spark-local-tests

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1682365&r1=1682364&r2=1682365&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java Fri May 29 04:57:24 2015
@@ -91,10 +91,10 @@ public class GlobalRearrangeConverter im
             JavaRDD<Tuple> jrdd2 = prdd.map(new GroupTupleFunction(physicalOperator));
             return jrdd2.rdd();
         } else {
-            List<RDD<Tuple2<Object, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<Object, Tuple>>>();
+            List<RDD<Tuple2<IndexedKey, Tuple>>> rddPairs = new ArrayList<RDD<Tuple2<IndexedKey, Tuple>>>();
             for (RDD<Tuple> rdd : predecessors) {
                 JavaRDD<Tuple> jrdd = JavaRDD.fromRDD(rdd, SparkUtil.getManifest(Tuple.class));
-                JavaRDD<Tuple2<Object, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
+                JavaRDD<Tuple2<IndexedKey, Tuple>> rddPair = jrdd.map(new ToKeyValueFunction());
                 rddPairs.add(rddPair.rdd());
             }
 
@@ -105,8 +105,8 @@ public class GlobalRearrangeConverter im
                             .asScalaBuffer(rddPairs).toSeq()),
                     new HashPartitioner(parallelism));
 
-            RDD<Tuple2<Object, Seq<Seq<Tuple>>>> rdd =
-                (RDD<Tuple2<Object, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
+            RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>> rdd =
+                (RDD<Tuple2<IndexedKey, Seq<Seq<Tuple>>>>) (Object) coGroupedRDD;
             return rdd.toJavaRDD().map(new ToGroupKeyValueFunction()).rdd();
         }
     }
@@ -289,20 +289,134 @@ public class GlobalRearrangeConverter im
         }
     }
 
+    /**
+     * IndexedKey records the index and key info of a tuple.
+     */
+    public static class IndexedKey implements Serializable {
+        private byte index;
+        private Object key;
+
+        public IndexedKey(byte index, Object key) {
+            this.index = index;
+            this.key = key;
+        }
+
+        public Object getKey() {
+            return key;
+        }
+
+        @Override
+        public String toString() {
+            return "IndexedKey{" +
+                    "index=" + index +
+                    ", key=" + key +
+                    '}';
+        }
+
+        /**
+         * If key is empty, we'd like compute equality based on key and index.
+         * If key is not empty, we'd like to compute equality based on just the key (like we normally do).
+         * There are two possible cases when two tuples are compared:
+         * 1) Compare tuples of same table (same index)
+         * 2) Compare tuples of different tables (different index values)
+         * In 1)
+         * key1    key2    equal?
+         * null    null      Y
+         * foo     null      N
+         * null    foo       N
+         * foo     foo       Y
+         * (1,1)   (1,1)     Y
+         * (1,)    (1,)      Y
+         * (1,2)   (1,2)     Y
+
+         *
+         * In 2)
+         * key1    key2    equal?
+         * null    null     N
+         * foo     null     N
+         * null    foo      N
+         * foo     foo      Y
+         * (1,1)   (1,1)    Y
+         * (1,)    (1,)     N
+         * (1,2)   (1,2)    Y
+
+         *
+         * @param o
+         * @return
+         */
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            IndexedKey that = (IndexedKey) o;
+            if (index == that.index) {
+                if (key == null && that.key == null) {
+                    return true;
+                } else if (key == null || that.key == null) {
+                    return false;
+                } else{
+                    return key.equals(that.key);
+                }
+            } else {
+                if (key == null || that.key == null) {
+                    return false;
+                } else if (key.equals(that.key) && !containNullfields(key)) {
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        }
+
+        private boolean containNullfields(Object key) {
+            if (key instanceof Tuple) {
+                for (int i = 0; i < ((Tuple) key).size(); i++) {
+                    try {
+                        if (((Tuple) key).get(i) == null) {
+                            return true;
+                        }
+                    } catch (ExecException e) {
+                        throw new RuntimeException("exception found in " +
+                                "containNullfields", e);
+
+                    }
+                }
+            }
+            return false;
+
+        }
+
+        /**
+         * Calculate hashCode by index and key
+         * if key is empty, return index value
+         * if key is not empty, return the key.hashCode()
+         */
+        @Override
+        public int hashCode() {
+            int result = 0;
+            if (key == null) {
+                result = (int) index;
+            }else {
+                result =  key.hashCode();
+            }
+            return result;
+        }
+    }
+
     private static class ToKeyValueFunction implements
-            Function<Tuple, Tuple2<Object, Tuple>>, Serializable {
+            Function<Tuple, Tuple2<IndexedKey, Tuple>>, Serializable {
 
         @Override
-        public Tuple2<Object, Tuple> call(Tuple t) {
+        public Tuple2<IndexedKey, Tuple> call(Tuple t) {
             try {
                 // (index, key, value)
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("ToKeyValueFunction in " + t);
                 }
-                Object key = t.get(1);
+                IndexedKey indexedKey = new IndexedKey((Byte) t.get(0), t.get(1));
                 Tuple value = (Tuple) t.get(2); // value
                 // (key, value)
-                Tuple2<Object, Tuple> out = new Tuple2<Object, Tuple>(key,
+                Tuple2<IndexedKey, Tuple> out = new Tuple2<IndexedKey, Tuple>(indexedKey,
                         value);
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("ToKeyValueFunction out " + out);
@@ -315,15 +429,15 @@ public class GlobalRearrangeConverter im
     }
 
     private static class ToGroupKeyValueFunction implements
-            Function<Tuple2<Object, Seq<Seq<Tuple>>>, Tuple>, Serializable {
+            Function<Tuple2<IndexedKey, Seq<Seq<Tuple>>>, Tuple>, Serializable {
 
         @Override
-        public Tuple call(Tuple2<Object, Seq<Seq<Tuple>>> input) {
+        public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
             try {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("ToGroupKeyValueFunction2 in " + input);
                 }
-                final Object key = input._1();
+                final Object key = input._1().getKey();
                 Object obj = input._2();
                 // XXX this is a hack for Spark 1.1.0: the type is an Array, not Seq
                 Seq<Tuple>[] bags = (Seq<Tuple>[])obj;

Added: pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java?rev=1682365&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java (added)
+++ pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java Fri May 29 04:57:24 2015
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.pig.spark;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import org.apache.pig.backend.hadoop.executionengine.spark.converter
+        .GlobalRearrangeConverter;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(JUnit4.class)
+public class TestIndexedKey {
+
+    /**Case1:Compare IndexedKeys with same index value
+     * key1    key2    equal?  hashCode1        hashCode2
+     * foo     null      N     hashCode(foo)    index
+     * null    foo       N     index            hashCode(foo)
+     * foo     foo       Y     hashCode(foo)    hashCode(foo)
+     * null    null      Y     index            index
+     * (1,1)   (1,1)     Y     hashCode((1,1))  hashCode((1,1))
+     * (1,)    (1,)      Y     hashCode((1,))   hashCode((1,))
+     * (1,1)   (1,2)     N     hashCode((1,1))  hashCode((1,2))
+     */
+    @Test
+    public void testIndexedKeyWithSameIndexValue() throws Exception {
+        GlobalRearrangeConverter.IndexedKey a0 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),"foo");
+        GlobalRearrangeConverter.IndexedKey a1 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),null);
+        assertEquals(a0.equals(a1), false);
+        assertEquals(a0.hashCode()==a1.hashCode(),false);
+
+        GlobalRearrangeConverter.IndexedKey a2 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),null);
+        GlobalRearrangeConverter.IndexedKey a3 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),"foo");
+        assertEquals(a2.equals(a3),false);
+        assertEquals(a2.hashCode()==a3.hashCode(),false);
+
+        GlobalRearrangeConverter.IndexedKey a4 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),"foo");
+        GlobalRearrangeConverter.IndexedKey a5 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),"foo");
+        assertEquals(a4.equals(a5),true);
+        assertEquals(a4.hashCode()==a5.hashCode(),true);
+
+        GlobalRearrangeConverter.IndexedKey a6 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),null);
+        GlobalRearrangeConverter.IndexedKey a7 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),null);
+        assertEquals(a6.equals(a7),true);
+        assertEquals(a6.hashCode()==a7.hashCode(),true);
+
+        Tuple t1 = TupleFactory.getInstance().newTuple(2);
+        t1.set(0,"1");
+        t1.set(1,"1");
+        Tuple t2 = TupleFactory.getInstance().newTuple(2);
+        t2.set(0,"1");
+        t2.set(1,"1");
+        GlobalRearrangeConverter.IndexedKey a8 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),t1);
+        GlobalRearrangeConverter.IndexedKey a9 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),t2);
+        assertEquals(a8.equals(a9),true);
+        assertEquals(a8.hashCode()==a9.hashCode(),true);
+
+        Tuple t3 = TupleFactory.getInstance().newTuple(2);
+        t3.set(0,"1");
+        t3.set(1,null);
+        Tuple t4 = TupleFactory.getInstance().newTuple(2);
+        t4.set(0,"1");
+        t4.set(1,null);
+        GlobalRearrangeConverter.IndexedKey a10 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),t3);
+        GlobalRearrangeConverter.IndexedKey a11 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),t4);
+        assertEquals(a10.equals(a11),true);
+        assertEquals(a10.hashCode()==a11.hashCode(),true);
+
+        Tuple t5 = TupleFactory.getInstance().newTuple(2);
+        t5.set(0,"1");
+        t5.set(1,"1");
+        Tuple t6 = TupleFactory.getInstance().newTuple(2);
+        t6.set(0,"1");
+        t6.set(1,"2");
+        GlobalRearrangeConverter.IndexedKey a12 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),t5);
+        GlobalRearrangeConverter.IndexedKey a13 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"),t6);
+        assertEquals(a12.equals(a13),false);
+        assertEquals(a12.hashCode()==a13.hashCode(),false);
+    }
+
+    /*
+     * Case2:Compare IndexedKeys with different index value
+     * key1    key2    equal?  hashCode1        hashCode2
+     * foo     null     N      hashCode(foo)    index2
+     * null    foo      N      index1           hashCode(foo)
+     * foo     foo      Y      hashCode(foo)    hashCode(foo)
+     * null    null     N      index1           index2
+     * (1,1)   (1,1)    Y      hashCode((1,1))  hashCode((1,1))
+     * (1,)    (1,)     N      hashCode((1,))   hashCode((1,))
+     * (1,1)   (1,2)    N      hashCode((1,1))  hashCode((1,2))
+     */
+    @Test
+    public void testIndexedKeyWithDifferentIndexValue() throws Exception {
+        GlobalRearrangeConverter.IndexedKey a0 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"), "foo");
+        GlobalRearrangeConverter.IndexedKey a1 = new GlobalRearrangeConverter.IndexedKey(new Byte("1"), null);
+        assertEquals(a0.equals(a1), false);
+        assertEquals(a0.hashCode() == a1.hashCode(), false);
+
+        GlobalRearrangeConverter.IndexedKey a2 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"), null);
+        GlobalRearrangeConverter.IndexedKey a3 = new GlobalRearrangeConverter.IndexedKey(new Byte("1"), "foo");
+        assertEquals(a2.equals(a3), false);
+        assertEquals(a2.hashCode() == a3.hashCode(), false);
+
+        GlobalRearrangeConverter.IndexedKey a4 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"), "foo");
+        GlobalRearrangeConverter.IndexedKey a5 = new GlobalRearrangeConverter.IndexedKey(new Byte("1"), "foo");
+        assertEquals(a4.equals(a5), true);
+        assertEquals(a4.hashCode() == a5.hashCode(), true);
+
+        GlobalRearrangeConverter.IndexedKey a6 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"), null);
+        GlobalRearrangeConverter.IndexedKey a7 = new GlobalRearrangeConverter.IndexedKey(new Byte("1"), null);
+        assertEquals(a6.equals(a7), false);
+        assertEquals(a6.hashCode() == a7.hashCode(), false);
+
+        Tuple t1 = TupleFactory.getInstance().newTuple(2);
+        t1.set(0, "1");
+        t1.set(1, "1");
+        Tuple t2 = TupleFactory.getInstance().newTuple(2);
+        t2.set(0, "1");
+        t2.set(1, "1");
+        GlobalRearrangeConverter.IndexedKey a8 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"), t1);
+        GlobalRearrangeConverter.IndexedKey a9 = new GlobalRearrangeConverter.IndexedKey(new Byte("1"), t2);
+        assertEquals(a8.equals(a9), true);
+        assertEquals(a8.hashCode() == a9.hashCode(), true);
+
+        Tuple t3 = TupleFactory.getInstance().newTuple(2);
+        t3.set(0, "1");
+        t3.set(1, null);
+        Tuple t4 = TupleFactory.getInstance().newTuple(2);
+        t4.set(0, "1");
+        t4.set(1, null);
+        GlobalRearrangeConverter.IndexedKey a10 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"), t3);
+        GlobalRearrangeConverter.IndexedKey a11 = new GlobalRearrangeConverter.IndexedKey(new Byte("1"), t4);
+        assertEquals(a10.equals(a11), false);
+        assertEquals(a10.hashCode() == a11.hashCode(), true); //hashcode of a10 and a11 are equal but they are not equal
+
+        Tuple t5 = TupleFactory.getInstance().newTuple(2);
+        t5.set(0, "1");
+        t5.set(1, "1");
+        Tuple t6 = TupleFactory.getInstance().newTuple(2);
+        t6.set(0, "1");
+        t6.set(1, "2");
+        GlobalRearrangeConverter.IndexedKey a12 = new GlobalRearrangeConverter.IndexedKey(new Byte("0"), t5);
+        GlobalRearrangeConverter.IndexedKey a13 = new GlobalRearrangeConverter.IndexedKey(new Byte("1"), t6);
+        assertEquals(a12.equals(a13), false);
+        assertEquals(a12.hashCode() == a13.hashCode(), false);
+    }
+}

Modified: pig/branches/spark/test/spark-local-tests
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/spark-local-tests?rev=1682365&r1=1682364&r2=1682365&view=diff
==============================================================================
--- pig/branches/spark/test/spark-local-tests (original)
+++ pig/branches/spark/test/spark-local-tests Fri May 29 04:57:24 2015
@@ -82,3 +82,4 @@
 **/TestStoreLocal.java
 **/TestMultiQueryLocal.java
 **/TestUnionOnSchema.java
+**/TestIndexedKey.java