You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/01/15 05:42:15 UTC

svn commit: r1724727 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java src/org/apache/pig/data/SelfSpillBag.java test/org/apache/pig/test/TestHBaseStorage.java

Author: xuefu
Date: Fri Jan 15 04:42:14 2016
New Revision: 1724727

URL: http://svn.apache.org/viewvc?rev=1724727&view=rev
Log:
Fix remaining unit test failures about TestHBaseStorage in spark mode (Liyun via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java
    pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1724727&r1=1724726&r2=1724727&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Fri Jan 15 04:42:14 2016
@@ -301,8 +301,15 @@ public class HBaseStorage extends LoadFu
         }
 
         columnInfo_ = parseColumnList(columnList, delimiter_, ignoreWhitespace_);
-
-        String defaultCaster = UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER);
+        //In mr,  UDFContext.deserialize is first called and then UDFContext.getUDFContext().getClientSystemProps() is called,
+        //the value is not null.
+        //In spark mode, when spark executor first initializes all
+        //the object,UDFContext.getUDFContext().getClientSystemProps() is null and then UDFContext.deserialize is called.
+        //so we need check whether UDFContext.getUDFContext().getClientSystemProps()
+        //is null or not, if is null, defaultCaster =STRING_CASTER, otherwise is
+        //UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER)
+        //Detail see PIG-4611
+        String defaultCaster = UDFContext.getUDFContext().getClientSystemProps() != null ? UDFContext.getUDFContext().getClientSystemProps().getProperty(CASTER_PROPERTY, STRING_CASTER) : STRING_CASTER;
         String casterOption = configuredOptions_.getOptionValue("caster", defaultCaster);
         if (STRING_CASTER.equalsIgnoreCase(casterOption)) {
             caster_ = new Utf8StorageConverter();

Modified: pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java?rev=1724727&r1=1724726&r2=1724727&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java Fri Jan 15 04:42:14 2016
@@ -29,7 +29,8 @@ import org.apache.pig.classification.Int
 @InterfaceStability.Evolving
 public abstract class SelfSpillBag extends DefaultAbstractBag {
     private static final long serialVersionUID = 1L;
-    protected MemoryLimits memLimit;
+    //in spark mode, if we don't set memLimit transient, it will throw NotSerializableExecption(See PIG-4611)
+    protected transient MemoryLimits memLimit;
 
     public SelfSpillBag(int bagCount) {
         memLimit = new MemoryLimits(bagCount, -1);

Modified: pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java?rev=1724727&r1=1724726&r2=1724727&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java Fri Jan 15 04:42:14 2016
@@ -18,6 +18,7 @@ package org.apache.pig.test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +47,7 @@ import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -1387,23 +1389,20 @@ public class TestHBaseStorage {
 
         Iterator<Tuple> it = pig.openIterator("c");
         int index = 0;
-        while (it.hasNext()) {
-            Tuple t = it.next();
-            String rowKey = (String) t.get(0);
-            int col_a = (Integer) t.get(1);
-            Assert.assertNotNull(t.get(2));
-            double col_b = (Double) t.get(2);
-            String col_c = (String) t.get(3);
-
-            Assert.assertEquals("00".substring((index + "").length()) + index,
-                    rowKey);
-            Assert.assertEquals(index, col_a);
-            Assert.assertEquals(index + 0.0, col_b, 1e-6);
-            Assert.assertEquals("Text_" + index, col_c);
+        List<Tuple> expected = new ArrayList<Tuple>();
+        while (index<TEST_ROW_COUNT) {
+            Tuple t = TupleFactory.getInstance().newTuple();
+            t.append("00".substring((index + "").length()) + index);
+            t.append(index);
+            t.append(index + 0.0);
+            t.append("Text_" + index);
+            t.append(index);
+            t.append(new DataByteArray("Text_" + index));
             index++;
+            expected.add(t);
         }
-        Assert.assertEquals(index, TEST_ROW_COUNT);
-    }
+        Util.checkQueryOutputsAfterSort(it, expected);
+}
 
     @Test
     // See PIG-4151