You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/08/23 05:27:17 UTC
svn commit: r1160536 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
test/org/apache/pig/test/TestHBaseStorage.java
Author: dvryaboy
Date: Tue Aug 23 03:27:17 2011
New Revision: 1160536
URL: http://svn.apache.org/viewvc?rev=1160536&view=rev
Log:
PIG-2193: Using HBaseStorage to scan 2 tables in the same Map job produces bad data
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1160536&r1=1160535&r2=1160536&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Aug 23 03:27:17 2011
@@ -114,6 +114,8 @@ PIG-2011: Speed up TestTypedMap.java (dv
BUG FIXES
+PIG-2193: Using HBaseStorage to scan 2 tables in the same Map job produces bad data (rangadi via dvryaboy)
+
PIG-2232: "declare" document contains a typo (daijy)
PIG-2055: inconsistent behavior in parser generated during build (thejas)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1160536&r1=1160535&r2=1160536&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Tue Aug 23 03:27:17 2011
@@ -160,7 +160,6 @@ public class HBaseStorage extends LoadFu
private ResourceSchema schema_;
private RequiredFieldList requiredFieldList;
- private boolean initialized = false;
private static void populateValidOptions() {
validOptions_.addOption("loadKey", false, "Load Key");
@@ -275,6 +274,21 @@ public class HBaseStorage extends LoadFu
}
/**
+ * Returns UDFProperties based on <code>contextSignature</code>.
+ */
+ private Properties getUDFProperties() {
+ return UDFContext.getUDFContext()
+ .getUDFProperties(this.getClass(), new String[] {contextSignature});
+ }
+
+ /**
+ * @return <code> contextSignature + "_projectedFields" </code>
+ */
+ private String projectedFieldsName() {
+ return contextSignature + "_projectedFields";
+ }
+
+ /**
*
* @param columnList
* @param delimiter
@@ -413,17 +427,6 @@ public class HBaseStorage extends LoadFu
@Override
public Tuple getNext() throws IOException {
try {
- if (!initialized) {
- Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
- new String[] {contextSignature});
-
- String projectedFields = p.getProperty(contextSignature+"_projectedFields");
- if (projectedFields != null) {
- requiredFieldList = (RequiredFieldList) ObjectSerializer.deserialize(projectedFields);
- pushProjection(requiredFieldList);
- }
- initialized = true;
- }
if (reader.nextKeyValue()) {
ImmutableBytesWritable rowKey = (ImmutableBytesWritable) reader
.getCurrentKey();
@@ -546,9 +549,10 @@ public class HBaseStorage extends LoadFu
m_table.setScannerCaching(caching_);
m_conf.set(TableInputFormat.INPUT_TABLE, tablename);
- // Set up scan if it is not already set up.
- if (m_conf.get(TableInputFormat.SCAN) != null) {
- return;
+ String projectedFields = getUDFProperties().getProperty( projectedFieldsName() );
+ if (projectedFields != null) {
+ // update columnInfo_
+ pushProjection((RequiredFieldList) ObjectSerializer.deserialize(projectedFields));
}
for (ColumnInfo columnInfo : columnInfo_) {
@@ -619,6 +623,8 @@ public class HBaseStorage extends LoadFu
throw new IOException("Bad Caster " + caster_.getClass());
}
schema_ = s;
+ getUDFProperties().setProperty(contextSignature + "_schema",
+ ObjectSerializer.serialize(schema_));
}
// Suppressing unchecked warnings for RecordWriter, which is not parameterized by StoreFuncInterface
@@ -631,15 +637,6 @@ public class HBaseStorage extends LoadFu
@SuppressWarnings("unchecked")
@Override
public void putNext(Tuple t) throws IOException {
- if (!initialized) {
- Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(),
- new String[] {contextSignature});
- String serializedSchema = p.getProperty(contextSignature + "_schema");
- if (serializedSchema!= null) {
- schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
- }
- initialized = true;
- }
ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : schema_.getFields();
byte type = (fieldSchemas == null) ? DataType.findType(t.get(0)) : fieldSchemas[0].getType();
long ts=System.currentTimeMillis();
@@ -748,10 +745,12 @@ public class HBaseStorage extends LoadFu
}else{
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, location);
}
- Properties props = UDFContext.getUDFContext().getUDFProperties(getClass(), new String[]{contextSignature});
- if (!props.containsKey(contextSignature + "_schema")) {
- props.setProperty(contextSignature + "_schema", ObjectSerializer.serialize(schema_));
- }
+
+ String serializedSchema = getUDFProperties().getProperty(contextSignature + "_schema");
+ if (serializedSchema!= null) {
+ schema_ = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
+ }
+
m_conf = HBaseConfiguration.addHbaseResources(job.getConfiguration());
}
@@ -774,6 +773,21 @@ public class HBaseStorage extends LoadFu
List<RequiredField> requiredFields = requiredFieldList.getFields();
List<ColumnInfo> newColumns = Lists.newArrayListWithExpectedSize(requiredFields.size());
+ if (this.requiredFieldList != null) {
+ // in addition to PIG, this is also called by this.setLocation().
+ LOG.debug("projection is already set. skipping.");
+ return new RequiredFieldResponse(true);
+ }
+
+ /* How projection is handled :
+ * - pushProjection() is invoked by PIG on the front end
+ * - pushProjection here both stores serialized projection in the
+ * context and adjusts columnInfo_.
+ * - setLocation() is invoked on the backend and it reads the
+ * projection from context. setLocation invokes this method again
+ * so that columnInfo_ is adjected.
+ */
+
// colOffset is the offset in our columnList that we need to apply to indexes we get from requiredFields
// (row key is not a real column)
int colOffset = loadRowKey_ ? 1 : 0;
@@ -786,7 +800,15 @@ public class HBaseStorage extends LoadFu
throw new FrontendException("The list of columns to project from HBase is larger than HBaseStorage is configured to load.");
}
- if (loadRowKey_ &&
+ // remember the projection
+ try {
+ getUDFProperties().setProperty( projectedFieldsName(),
+ ObjectSerializer.serialize(requiredFieldList) );
+ } catch (IOException e) {
+ throw new FrontendException(e);
+ }
+
+ if (loadRowKey_ &&
( requiredFields.size() < 1 || requiredFields.get(0).getIndex() != 0)) {
loadRowKey_ = false;
projOffset = 0;
Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1160536&r1=1160535&r2=1160536&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Tue Aug 23 03:27:17 2011
@@ -843,6 +843,43 @@ public class TestHBaseStorage {
Assert.assertEquals(100, i);
}
+ /**
+ * Test to if HBaseStorage handles different scans in a single MR job.
+ * This can happen PIG loads two different aliases (as in a join or
+ * union).
+ */
+ @Test
+ public void testHeterogeneousScans() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+ prepareTable(TESTTABLE_2, true, DataFormat.UTF8PlainText);
+ scanTable1(pig, DataFormat.HBaseBinary);
+ pig.registerQuery(String.format(
+ " b = load 'hbase://%s' using %s('%s %s') as (col_a:int, col_c);",
+ TESTTABLE_2, "org.apache.pig.backend.hadoop.hbase.HBaseStorage",
+ TESTCOLUMN_A, TESTCOLUMN_C));
+ pig.registerQuery(" c = join a by col_a, b by col_a; ");
+ // this results in a single job with mappers loading
+ // different HBaseStorage specs.
+
+ Iterator<Tuple> it = pig.openIterator("c");
+ int index = 0;
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ String rowKey = (String) t.get(0);
+ int col_a = (Integer) t.get(1);
+ Assert.assertNotNull(t.get(2));
+ double col_b = (Double) t.get(2);
+ String col_c = (String) t.get(3);
+
+ Assert.assertEquals("00".substring((index + "").length()) + index,
+ rowKey);
+ Assert.assertEquals(index, col_a);
+ Assert.assertEquals(index + 0.0, col_b, 1e-6);
+ Assert.assertEquals("Text_" + index, col_c);
+ index++;
+ }
+ Assert.assertEquals(index, TEST_ROW_COUNT);
+ }
private void scanTable1(PigServer pig, DataFormat dataFormat) throws IOException {
scanTable1(pig, dataFormat, "");