You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/08/23 05:27:17 UTC

svn commit: r1160536 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java test/org/apache/pig/test/TestHBaseStorage.java

Author: dvryaboy
Date: Tue Aug 23 03:27:17 2011
New Revision: 1160536

URL: http://svn.apache.org/viewvc?rev=1160536&view=rev
Log:
PIG-2193: Using HBaseStorage to scan 2 tables in the same Map job produces bad data

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1160536&r1=1160535&r2=1160536&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Aug 23 03:27:17 2011
@@ -114,6 +114,8 @@ PIG-2011: Speed up TestTypedMap.java (dv
 
 BUG FIXES
 
+PIG-2193: Using HBaseStorage to scan 2 tables in the same Map job produces bad data (rangadi via dvryaboy)
+
 PIG-2232: "declare" document contains a typo (daijy)
 
 PIG-2055: inconsistent behavior in parser generated during build (thejas)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1160536&r1=1160535&r2=1160536&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Tue Aug 23 03:27:17 2011
@@ -160,7 +160,6 @@ public class HBaseStorage extends LoadFu
 
     private ResourceSchema schema_;
     private RequiredFieldList requiredFieldList;
-    private boolean initialized = false;
 
     private static void populateValidOptions() { 
         validOptions_.addOption("loadKey", false, "Load Key");
@@ -275,6 +274,21 @@ public class HBaseStorage extends LoadFu
     }
 
     /**
+     * Returns UDFProperties based on <code>contextSignature</code>.
+     */
+    private Properties getUDFProperties() {
+        return UDFContext.getUDFContext()
+            .getUDFProperties(this.getClass(), new String[] {contextSignature});
+    }
+
+    /**
+     * @return <code> contextSignature + "_projectedFields" </code>
+     */
+    private String projectedFieldsName() {
+        return contextSignature + "_projectedFields";
+    }
+
+    /**
      *
      * @param columnList
      * @param delimiter
@@ -413,17 +427,6 @@ public class HBaseStorage extends LoadFu
     @Override
     public Tuple getNext() throws IOException {
         try {
-            if (!initialized) {
-                Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
-                        new String[] {contextSignature});
-
-                String projectedFields = p.getProperty(contextSignature+"_projectedFields");
-                if (projectedFields != null) {
-                    requiredFieldList = (RequiredFieldList) ObjectSerializer.deserialize(projectedFields);
-                    pushProjection(requiredFieldList);
-                }
-                initialized = true;
-            }
             if (reader.nextKeyValue()) {
                 ImmutableBytesWritable rowKey = (ImmutableBytesWritable) reader
                 .getCurrentKey();
@@ -546,9 +549,10 @@ public class HBaseStorage extends LoadFu
         m_table.setScannerCaching(caching_);
         m_conf.set(TableInputFormat.INPUT_TABLE, tablename);
 
-        // Set up scan if it is not already set up.
-        if (m_conf.get(TableInputFormat.SCAN) != null) {
-            return;
+        String projectedFields = getUDFProperties().getProperty( projectedFieldsName() );
+        if (projectedFields != null) {
+            // update columnInfo_
+            pushProjection((RequiredFieldList) ObjectSerializer.deserialize(projectedFields));
         }
 
         for (ColumnInfo columnInfo : columnInfo_) {
@@ -619,6 +623,8 @@ public class HBaseStorage extends LoadFu
             throw new IOException("Bad Caster " + caster_.getClass());
         }
         schema_ = s;
+        getUDFProperties().setProperty(contextSignature + "_schema",
+                                       ObjectSerializer.serialize(schema_));
     }
 
     // Suppressing unchecked warnings for RecordWriter, which is not parameterized by StoreFuncInterface
@@ -631,15 +637,6 @@ public class HBaseStorage extends LoadFu
     @SuppressWarnings("unchecked")
     @Override
     public void putNext(Tuple t) throws IOException {
-        if (!initialized) {
-            Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
-                    new String[] {contextSignature});
-            String serializedSchema = p.getProperty(contextSignature + "_schema");
-            if (serializedSchema!= null) {
-                schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
-            }
-            initialized = true;
-        }
         ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : schema_.getFields();
         byte type = (fieldSchemas == null) ? DataType.findType(t.get(0)) : fieldSchemas[0].getType();
         long ts=System.currentTimeMillis();
@@ -748,10 +745,12 @@ public class HBaseStorage extends LoadFu
         }else{
             job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location);
         }
-        Properties props = UDFContext.getUDFContext().getUDFProperties(getClass(), new String[]{contextSignature});
-        if (!props.containsKey(contextSignature + "_schema")) {
-            props.setProperty(contextSignature + "_schema",  ObjectSerializer.serialize(schema_));
-    }
+
+        String serializedSchema = getUDFProperties().getProperty(contextSignature + "_schema");
+        if (serializedSchema!= null) {
+            schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
+        }
+
         m_conf = HBaseConfiguration.addHbaseResources(job.getConfiguration());
     }
 
@@ -774,6 +773,21 @@ public class HBaseStorage extends LoadFu
         List<RequiredField>  requiredFields = requiredFieldList.getFields();
         List<ColumnInfo> newColumns = Lists.newArrayListWithExpectedSize(requiredFields.size());
 
+        if (this.requiredFieldList != null) {
+            // in addition to PIG, this is also called by this.setLocation().
+            LOG.debug("projection is already set. skipping.");
+            return new RequiredFieldResponse(true);
+        }
+
+        /* How projection is handled :
+         *  - pushProjection() is invoked by PIG on the front end
+         *  - pushProjection here both stores serialized projection in the
+         *    context and adjusts columnInfo_.
+         *  - setLocation() is invoked on the backend and it reads the
+         *    projection from context. setLocation invokes this method again
+         *    so that columnInfo_ is adjected.
+         */
+
         // colOffset is the offset in our columnList that we need to apply to indexes we get from requiredFields
         // (row key is not a real column)
         int colOffset = loadRowKey_ ? 1 : 0;
@@ -786,7 +800,15 @@ public class HBaseStorage extends LoadFu
             throw new FrontendException("The list of columns to project from HBase is larger than HBaseStorage is configured to load.");
         }
 
-        if (loadRowKey_ &&
+        // remember the projection
+        try {
+            getUDFProperties().setProperty( projectedFieldsName(),
+                    ObjectSerializer.serialize(requiredFieldList) );
+        } catch (IOException e) {
+            throw new FrontendException(e);
+        }
+
+       if (loadRowKey_ &&
                 ( requiredFields.size() < 1 || requiredFields.get(0).getIndex() != 0)) {
                 loadRowKey_ = false;
             projOffset = 0;

Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1160536&r1=1160535&r2=1160536&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Tue Aug 23 03:27:17 2011
@@ -843,6 +843,43 @@ public class TestHBaseStorage {
         Assert.assertEquals(100, i);
     }
 
+    /**
+     * Test to if HBaseStorage handles different scans in a single MR job.
+     * This can happen PIG loads two different aliases (as in a join or
+     * union).
+     */
+    @Test
+    public void testHeterogeneousScans() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+        prepareTable(TESTTABLE_2, true, DataFormat.UTF8PlainText);
+        scanTable1(pig, DataFormat.HBaseBinary);
+        pig.registerQuery(String.format(
+              " b = load 'hbase://%s' using %s('%s %s') as (col_a:int, col_c);",
+              TESTTABLE_2, "org.apache.pig.backend.hadoop.hbase.HBaseStorage",
+              TESTCOLUMN_A, TESTCOLUMN_C));
+        pig.registerQuery(" c = join a by col_a, b by col_a; ");
+        // this results in a single job with mappers loading
+        // different HBaseStorage specs.
+
+        Iterator<Tuple> it = pig.openIterator("c");
+        int index = 0;
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            String rowKey = (String) t.get(0);
+            int col_a = (Integer) t.get(1);
+            Assert.assertNotNull(t.get(2));
+            double col_b = (Double) t.get(2);
+            String col_c = (String) t.get(3);
+
+            Assert.assertEquals("00".substring((index + "").length()) + index,
+                    rowKey);
+            Assert.assertEquals(index, col_a);
+            Assert.assertEquals(index + 0.0, col_b, 1e-6);
+            Assert.assertEquals("Text_" + index, col_c);
+            index++;
+        }
+        Assert.assertEquals(index, TEST_ROW_COUNT);
+    }
 
     private void scanTable1(PigServer pig, DataFormat dataFormat) throws IOException {
         scanTable1(pig, dataFormat, "");