You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/01/26 22:25:31 UTC
svn commit: r903423 - in /hadoop/pig/branches/load-store-redesign:
src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
test/org/apache/pig/test/
Author: pradeepkth
Date: Tue Jan 26 21:25:31 2010
New Revision: 903423
URL: http://svn.apache.org/viewvc?rev=903423&view=rev
Log:
PIG-1090: additional patch (daijy via pradeepkth)
Modified:
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java
hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java?rev=903423&r1=903422&r2=903423&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/ResourceSchema.java Tue Jan 26 21:25:31 2010
@@ -149,6 +149,27 @@
}
}
+ public ResourceSchema(Schema pigSchema, SortInfo sortInfo) {
+ this(pigSchema);
+ if (sortInfo!=null && sortInfo.getSortColInfoList().size()!=0) {
+ sortKeys = new int[sortInfo.getSortColInfoList().size()];
+ sortKeyOrders = new Order[sortInfo.getSortColInfoList().size()];
+ for (int i=0;i<sortInfo.getSortColInfoList().size();i++) {
+ SortColInfo colInfo = sortInfo.getSortColInfoList().get(i);
+ int index = colInfo.getColIndex();
+ Order order;
+ org.apache.pig.SortColInfo.Order origOrder = colInfo.getSortOrder();
+ if (origOrder==org.apache.pig.SortColInfo.Order.ASCENDING) {
+ order = Order.ASCENDING;
+ } else {
+ order = Order.DESCENDING;
+ }
+ sortKeys[i] = index;
+ sortKeyOrders[i] = order;
+ }
+ }
+ }
+
public int getVersion() {
return version;
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=903423&r1=903422&r2=903423&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Tue Jan 26 21:25:31 2010
@@ -444,17 +444,17 @@
for (POStore st: mapStores) {
storeLocations.add(st);
StoreFunc sFunc = st.getStoreFunc();
- //sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+ sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
if (st.getSchema()!=null)
- sFunc.checkSchema(new ResourceSchema(st.getSchema()));
+ sFunc.checkSchema(new ResourceSchema(st.getSchema(), st.getSortInfo()));
}
for (POStore st: reduceStores) {
storeLocations.add(st);
StoreFunc sFunc = st.getStoreFunc();
- //sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
+ sFunc.setStoreLocation(st.getSFile().getFileName(), nwJob);
if (st.getSchema()!=null)
- sFunc.checkSchema(new ResourceSchema(st.getSchema()));
+ sFunc.checkSchema(new ResourceSchema(st.getSchema(), st.getSortInfo()));
}
// the OutputFormat we report to Hadoop is always PigOutputFormat
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=903423&r1=903422&r2=903423&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Tue Jan 26 21:25:31 2010
@@ -159,7 +159,7 @@
Schema schema = store.getSchema();
if (schema != null) {
((StoreMetadata) storeFunc).storeSchema(
- new ResourceSchema(schema), store.getSFile()
+ new ResourceSchema(schema, store.getSortInfo()), store.getSFile()
.getFileName(), conf);
}
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=903423&r1=903422&r2=903423&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Tue Jan 26 21:25:31 2010
@@ -1630,6 +1630,23 @@
// check limit's predecessor
if(op.get(0) instanceof LOLimit) {
op = loStore.getPlan().getPredecessors(op.get(0));
+ } else if (op.get(0) instanceof LOSplitOutput) {
+ LOSplitOutput splitOutput = (LOSplitOutput)op.get(0);
+ // We assume this is the LOSplitOutput we injected for this case:
+ // b = order a by $0; store b into '1'; store b into '2';
+ // In this case, we should mark both '1' and '2' as sorted
+ LogicalPlan conditionPlan = splitOutput.getConditionPlan();
+ if (conditionPlan.getRoots().size()==1) {
+ LogicalOperator root = conditionPlan.getRoots().get(0);
+ if (root instanceof LOConst) {
+ Object value = ((LOConst)root).getValue();
+ if (value instanceof Boolean && (Boolean)value==true) {
+ LogicalOperator split = splitOutput.getPlan().getPredecessors(splitOutput).get(0);
+ if (split instanceof LOSplit)
+ op = loStore.getPlan().getPredecessors(split);
+ }
+ }
+ }
}
PhysicalOperator sortPhyOp = logToPhyMap.get(op.get(0));
// if this predecessor is a sort, get
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java?rev=903423&r1=903422&r2=903423&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogToPhyCompiler.java Tue Jan 26 21:25:31 2010
@@ -34,6 +34,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
import org.apache.pig.SortColInfo;
import org.apache.pig.SortInfo;
import org.apache.pig.backend.executionengine.ExecException;
@@ -704,6 +705,41 @@
}
/**
+ * tests that sortInfo is not null when there are multiple store
+ * @throws Exception
+ */
+ @Test
+ public void testSortInfoMultipleStore() throws Exception {
+ PigServer myPig = new PigServer(ExecType.MAPREDUCE);
+ myPig.setBatchOn();
+ myPig.registerQuery("a = load 'bla' as (i:int, n:chararray, d:double);");
+ myPig.registerQuery("b = order a by i, d desc;");
+ myPig.registerQuery("store b into '1';");
+ myPig.registerQuery("store b into '2';");
+ java.lang.reflect.Method compileLp = myPig.getClass()
+ .getDeclaredMethod("compileLp",
+ new Class[] { String.class });
+
+ compileLp.setAccessible(true);
+
+ LogicalPlan lp = (LogicalPlan) compileLp.invoke(myPig, new Object[] { null });
+ LOPrinter lpr = new LOPrinter(System.err, lp);
+ lpr.visit();
+
+ PhysicalPlan pp = buildPhysicalPlan(lp);
+ SortInfo si0 = ((POStore)(pp.getLeaves().get(0))).getSortInfo();
+ SortInfo si1 = ((POStore)(pp.getLeaves().get(1))).getSortInfo();
+ SortInfo expected = getSortInfo(
+ Arrays.asList(new String[] {"i", "d"}),
+ Arrays.asList(new Integer[] {0, 2}),
+ Arrays.asList(new SortColInfo.Order[] {
+ SortColInfo.Order.ASCENDING,
+ SortColInfo.Order.DESCENDING}));
+ assertEquals(expected, si0);
+ assertEquals(expected, si1);
+ }
+
+ /**
* tests that sortInfo is null when there is no schema for order by
* before the store
* @throws Exception
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java?rev=903423&r1=903422&r2=903423&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestResourceSchema.java Tue Jan 26 21:25:31 2010
@@ -21,7 +21,12 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.pig.ResourceSchema;
+import org.apache.pig.SortColInfo;
+import org.apache.pig.SortInfo;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
@@ -82,6 +87,40 @@
}
/**
+ * Test that ResourceSchema is correctly with SortInfo
+ */
+ @Test
+ public void testResourceFlatSchemaCreationWithSortInfo()
+ throws ExecException, SchemaMergeException, FrontendException {
+ String [] aliases ={"f1", "f2"};
+ byte[] types = {DataType.CHARARRAY, DataType.INTEGER};
+
+ Schema origSchema = new Schema(
+ new Schema.FieldSchema("t1",
+ new Schema(
+ new Schema.FieldSchema("t0",
+ TypeCheckingTestUtil.genFlatSchema(
+ aliases,types),
+ DataType.TUPLE)), DataType.BAG));
+ List<SortColInfo> colList = new ArrayList<SortColInfo>();
+ SortColInfo col1 = new SortColInfo("f1", 0, SortColInfo.Order.ASCENDING);
+ SortColInfo col2 = new SortColInfo("f1", 1, SortColInfo.Order.DESCENDING);
+ colList.add(col1);
+ colList.add(col2);
+ SortInfo sortInfo = new SortInfo(colList);
+
+ ResourceSchema rsSchema = new ResourceSchema(origSchema, sortInfo);
+
+ Schema genSchema = Schema.getPigSchema(rsSchema);
+ assertTrue("generated schema equals original",
+ Schema.equals(genSchema, origSchema, true, false));
+ assertTrue(rsSchema.getSortKeys()[0]==0);
+ assertTrue(rsSchema.getSortKeys()[1]==1);
+ assertTrue(rsSchema.getSortKeyOrders()[0]==ResourceSchema.Order.ASCENDING);
+ assertTrue(rsSchema.getSortKeyOrders()[1]==ResourceSchema.Order.DESCENDING);
+ }
+
+ /**
* Test that Pig Schema is correctly created given a
* ResourceSchema and vice versa. Test also that
* TwoLevelAccess flag is set for Pig Schema when needed.