You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/12/11 23:29:29 UTC
svn commit: r725846 [1/3] - in /hadoop/pig/branches/types:
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/
src/org/apache/pig/backend/hadoop/executionengine/physica...
Author: gates
Date: Thu Dec 11 14:29:29 2008
New Revision: 725846
URL: http://svn.apache.org/viewvc?rev=725846&view=rev
Log:
PIG-543. Add a true local mode instead of using the local map reduce cluster. Contributed by Shubham.
Added:
hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCross.java
hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POStreamLocal.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEvalLocal.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipelineLocal.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestLocal.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestLocal2.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestPOCross.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestStreamingLocal.java
Modified:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
hadoop/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=725846&r1=725845&r2=725846&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Dec 11 14:29:29 2008
@@ -71,7 +71,7 @@
private Log log = LogFactory.getLog(getClass());
- PigContext pc;
+ protected PigContext pc;
LoadFunc load;
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=725846&r1=725845&r2=725846&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Dec 11 14:29:29 2008
@@ -261,5 +261,10 @@
}
+ public void visitCross(POCross cross) {
+ // TODO Auto-generated method stub
+
+ }
+
}
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=725846&r1=725845&r2=725846&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Thu Dec 11 14:29:29 2008
@@ -45,15 +45,15 @@
private StreamingCommand command; // Actual command to be run
private Properties properties;
- private boolean initialized = false;
+ protected boolean initialized = false;
- private BlockingQueue<Result> binaryOutputQueue = new ArrayBlockingQueue<Result>(1);
+ protected BlockingQueue<Result> binaryOutputQueue = new ArrayBlockingQueue<Result>(1);
- private BlockingQueue<Result> binaryInputQueue = new ArrayBlockingQueue<Result>(1);
+ protected BlockingQueue<Result> binaryInputQueue = new ArrayBlockingQueue<Result>(1);
- private boolean allInputFromPredecessorConsumed = false;
+ protected boolean allInputFromPredecessorConsumed = false;
- private boolean allOutputFromBinaryProcessed = false;
+ protected boolean allOutputFromBinaryProcessed = false;
public POStream(OperatorKey k, ExecutableManager executableManager,
StreamingCommand command, Properties properties) {
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=725846&r1=725845&r2=725846&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Thu Dec 11 14:29:29 2008
@@ -54,29 +54,30 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.local.executionengine.physicalLayer.LocalLogToPhyTranslationVisitor;
import org.apache.pig.impl.plan.VisitorException;
import java.util.Iterator;
-
public class LocalExecutionEngine implements ExecutionEngine {
protected PigContext pigContext;
protected DataStorage ds;
protected NodeIdGenerator nodeIdGenerator;
- // key: the operator key from the logical plan that originated the physical plan
+ // key: the operator key from the logical plan that originated the physical
+ // plan
// val: the operator key for the root of the phyisical plan
protected Map<OperatorKey, OperatorKey> logicalToPhysicalKeys;
-
+
protected Map<OperatorKey, PhysicalOperator> physicalOpTable;
-
+
// map from LOGICAL key to into about the execution
protected Map<OperatorKey, LocalResult> materializedResults;
-
+
public LocalExecutionEngine(PigContext pigContext) {
this.pigContext = pigContext;
this.ds = pigContext.getLfs();
- this.nodeIdGenerator = NodeIdGenerator.getGenerator();
+ this.nodeIdGenerator = NodeIdGenerator.getGenerator();
this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();
this.physicalOpTable = new HashMap<OperatorKey, PhysicalOperator>();
this.materializedResults = new HashMap<OperatorKey, LocalResult>();
@@ -85,7 +86,7 @@
public DataStorage getDataStorage() {
return this.ds;
}
-
+
public void init() throws ExecException {
;
}
@@ -93,30 +94,46 @@
public void close() throws ExecException {
;
}
-
+
public Properties getConfiguration() throws ExecException {
return this.pigContext.getProperties();
}
-
- public void updateConfiguration(Properties newConfiguration)
- throws ExecException {
+
+ public void updateConfiguration(Properties newConfiguration)
+ throws ExecException {
// there is nothing to do here.
}
-
+
public Map<String, Object> getStatistics() throws ExecException {
throw new UnsupportedOperationException();
}
-
- public PhysicalPlan compile(LogicalPlan plan,
- Properties properties) throws ExecException {
+ // public PhysicalPlan compile(LogicalPlan plan,
+ // Properties properties) throws ExecException {
+ // if (plan == null) {
+ // throw new ExecException("No Plan to compile");
+ // }
+ //
+ // try {
+ // LogToPhyTranslationVisitor translator =
+ // new LogToPhyTranslationVisitor(plan);
+ // translator.setPigContext(pigContext);
+ // translator.visit();
+ // return translator.getPhysicalPlan();
+ // } catch (VisitorException ve) {
+ // throw new ExecException(ve);
+ // }
+ // }
+
+ public PhysicalPlan compile(LogicalPlan plan, Properties properties)
+ throws ExecException {
if (plan == null) {
throw new ExecException("No Plan to compile");
}
try {
- LogToPhyTranslationVisitor translator =
- new LogToPhyTranslationVisitor(plan);
+ LocalLogToPhyTranslationVisitor translator = new LocalLogToPhyTranslationVisitor(
+ plan);
translator.setPigContext(pigContext);
translator.visit();
return translator.getPhysicalPlan();
@@ -125,42 +142,45 @@
}
}
- public ExecJob execute(PhysicalPlan plan,
- String jobName) throws ExecException {
+ public ExecJob execute(PhysicalPlan plan, String jobName)
+ throws ExecException {
try {
- PhysicalOperator leaf = (PhysicalOperator)plan.getLeaves().get(0);
+ PhysicalOperator leaf = (PhysicalOperator) plan.getLeaves().get(0);
FileSpec spec = null;
- if(!(leaf instanceof POStore)){
+ if (!(leaf instanceof POStore)) {
String scope = leaf.getOperatorKey().getScope();
POStore str = new POStore(new OperatorKey(scope,
- NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)));
str.setPc(pigContext);
spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
- pigContext).toString(),
- new FuncSpec(BinStorage.class.getName()));
+ pigContext).toString(), new FuncSpec(BinStorage.class
+ .getName()));
str.setSFile(spec);
plan.addAsLeaf(str);
- }
- else{
- spec = ((POStore)leaf).getSFile();
+ } else {
+ spec = ((POStore) leaf).getSFile();
}
- LocalLauncher launcher = new LocalLauncher();
+ // LocalLauncher launcher = new LocalLauncher();
+ LocalPigLauncher launcher = new LocalPigLauncher();
boolean success = launcher.launchPig(plan, jobName, pigContext);
- if(success)
- return new LocalJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec);
+ if (success)
+ return new LocalJob(ExecJob.JOB_STATUS.COMPLETED, pigContext,
+ spec);
else
return new LocalJob(ExecJob.JOB_STATUS.FAILED, pigContext, null);
} catch (Exception e) {
- // There are a lot of exceptions thrown by the launcher. If this
- // is an ExecException, just let it through. Else wrap it.
- if (e instanceof ExecException) throw (ExecException)e;
- else throw new ExecException(e.getMessage(), e);
+ // There are a lot of exceptions thrown by the launcher. If this
+ // is an ExecException, just let it through. Else wrap it.
+ if (e instanceof ExecException)
+ throw (ExecException) e;
+ else
+ throw new ExecException(e.getMessage(), e);
}
}
- public LocalJob submit(PhysicalPlan plan,
- String jobName) throws ExecException {
+ public LocalJob submit(PhysicalPlan plan, String jobName)
+ throws ExecException {
throw new UnsupportedOperationException();
}
@@ -172,33 +192,32 @@
ExecTools.checkLeafIsStore(plan, pigContext);
- LocalLauncher launcher = new LocalLauncher();
+ // LocalLauncher launcher = new LocalLauncher();
+ LocalPigLauncher launcher = new LocalPigLauncher();
launcher.explain(plan, pigContext, stream);
} catch (Exception ve) {
throw new RuntimeException(ve);
}
}
- public Collection<ExecJob> runningJobs(Properties properties) throws ExecException {
+ public Collection<ExecJob> runningJobs(Properties properties)
+ throws ExecException {
return new HashSet<ExecJob>();
}
-
+
public Collection<String> activeScopes() throws ExecException {
throw new UnsupportedOperationException();
}
-
+
public void reclaimScope(String scope) throws ExecException {
throw new UnsupportedOperationException();
}
-
+
private OperatorKey doCompile(OperatorKey logicalKey,
- Map<OperatorKey, LogicalOperator> logicalOpTable,
- Properties properties)
- throws ExecException {
-
+ Map<OperatorKey, LogicalOperator> logicalOpTable,
+ Properties properties) throws ExecException {
+
return null;
}
-
-}
-
+}
Added: hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java?rev=725846&view=auto
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java (added)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java Thu Dec 11 14:29:29 2008
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class LocalPigLauncher extends Launcher {
+
+ Log log = LogFactory.getLog(getClass());
+
+ @Override
+ public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps)
+ throws PlanException, VisitorException, IOException {
+ // TODO Auto-generated method stub
+ pp.explain(ps);
+ ps.append('\n');
+ }
+
+ @Override
+ public boolean launchPig(PhysicalPlan php, String grpName, PigContext pc)
+ throws PlanException, VisitorException, IOException, ExecException,
+ JobCreationException {
+ // TODO Auto-generated method stub
+ List<PhysicalOperator> stores = php.getLeaves();
+ int noJobs = stores.size();
+ int failedJobs = 0;
+
+ for (PhysicalOperator op : stores) {
+ POStore store = (POStore) op;
+ Result res = store.store();
+ if (res.returnStatus != POStatus.STATUS_EOP)
+ failedJobs++;
+ }
+
+ if (failedJobs == 0) {
+ log.info("100% complete!");
+ log.info("Success!!");
+ return true;
+ } else {
+ log.info("Failed jobs!!");
+ log.info(failedJobs + " out of " + noJobs + " failed!");
+ }
+ return false;
+
+ }
+
+}
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=725846&r1=725845&r2=725846&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java Thu Dec 11 14:29:29 2008
@@ -26,15 +26,18 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate;
import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup;
+import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCross;
import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplitOutput;
+import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POStreamLocal;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOCross;
import org.apache.pig.impl.logicalLayer.LOSplit;
import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOStream;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.plan.OperatorKey;
@@ -62,7 +65,7 @@
List<LogicalOperator> inputs = cg.getInputs();
POCogroup poc = new POCogroup(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
-
+ poc.setInner(cg.getInner());
currentPlan.add(poc);
int count = 0;
@@ -159,5 +162,48 @@
throw new VisitorException(e);
}
}
+
+ @Override
+ public void visit(LOStream stream) throws VisitorException {
+ String scope = stream.getOperatorKey().scope;
+ POStreamLocal poStream = new POStreamLocal(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), stream.getExecutableManager(),
+ stream.getStreamingCommand(), pc.getProperties());
+ currentPlan.add(poStream);
+ LogToPhyMap.put(stream, poStream);
+
+ List<LogicalOperator> op = stream.getPlan().getPredecessors(stream);
+
+ PhysicalOperator from = LogToPhyMap.get(op.get(0));
+ try {
+ currentPlan.connect(from, poStream);
+ } catch (PlanException e) {
+ log.error("Invalid physical operators in the physical plan"
+ + e.getMessage());
+ throw new VisitorException(e);
+ }
+ }
+
+ @Override
+ public void visit(LOCross cross) throws VisitorException {
+ String scope = cross.getOperatorKey().scope;
+
+ POCross pocross = new POCross(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+ LogToPhyMap.put(cross, pocross);
+ currentPlan.add(pocross);
+
+
+ for(LogicalOperator in : cross.getInputs()) {
+ PhysicalOperator from = LogToPhyMap.get(in);
+ try {
+ currentPlan.connect(from, pocross);
+ } catch (PlanException e) {
+ log.error("Invalid physical operators in the physical plan"
+ + e.getMessage());
+ throw new VisitorException(e);
+ }
+ }
+ //currentPlan.explain(System.out);
+ }
}
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java?rev=725846&r1=725845&r2=725846&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java Thu Dec 11 14:29:29 2008
@@ -51,6 +51,7 @@
Tuple[] data = null;
Iterator<Tuple>[] its = null;
+ boolean[] inner;
public POCogroup(OperatorKey k) {
super(k);
@@ -69,6 +70,10 @@
public POCogroup(OperatorKey k, int rp, List<PhysicalOperator> inp) {
super(k, rp, inp);
}
+
+ public void setInner(boolean[] inner) {
+ this.inner = inner;
+ }
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
@@ -149,6 +154,14 @@
res.result = output;
res.returnStatus = POStatus.STATUS_OK;
+// System.out.println(output);
+ for(int i = 0; i < size; i++) {
+ if(inner != null && inner[i] && ((DataBag)output.get(i+1)).size() == 0) {
+ res.returnStatus = POStatus.STATUS_NULL;
+ break;
+ }
+ }
+
return res;
}
@@ -160,15 +173,18 @@
for(int i = 0; i < size; i++) {
DataBag bag = new SortedDataBag(new groupComparator());
for(Result input = inputs.get(i).getNext(dummyTuple); input.returnStatus != POStatus.STATUS_EOP; input = inputs.get(i).getNext(dummyTuple)) {
- if(input.returnStatus == POStatus.STATUS_ERR) {
- throw new ExecException("Error accumulating output at local Cogroup operator");
- }
- bag.add((Tuple) input.result);
+ if(input.returnStatus == POStatus.STATUS_ERR) {
+ throw new ExecException("Error accumulating output at local Cogroup operator");
+ }
+ if(input.returnStatus == POStatus.STATUS_NULL)
+ continue;
+ bag.add((Tuple) input.result);
}
+
its[i] = bag.iterator();
data[i] = its[i].next();
}
-
+
}
// private Tuple getSmallest(Tuple[] data) {
@@ -191,7 +207,7 @@
t = data[i];
continue; //since the previous data was probably null so we dont really need a comparison
}
- if(comp.compare(t, (Tuple) data[i]) < 0)
+ if(comp.compare(t, (Tuple) data[i]) > 0)
t = data[i];
}
return t;
Added: hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCross.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCross.java?rev=725846&view=auto
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCross.java (added)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCross.java Thu Dec 11 14:29:29 2008
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+
+/**
+ * This is a local implementation of the cross. Its a blocking operator.
+ * It accumulates inputs into databags and then applies logic similar to
+ * foreach flatten(*) to get the output tuples
+ *
+ * @author shubhamc
+ *
+ */
+public class POCross extends PhysicalOperator {
+
+ DataBag [] inputBags;
+ Tuple [] data;
+ Iterator [] its;
+
+ public POCross(OperatorKey k) {
+ super(k);
+ // TODO Auto-generated constructor stub
+ }
+
+ public POCross(OperatorKey k, int rp) {
+ super(k, rp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public POCross(OperatorKey k, List<PhysicalOperator> inp) {
+ super(k, inp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public POCross(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ // TODO Auto-generated method stub
+ v.visitCross(this);
+
+ }
+
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ Result res = new Result();
+ int noItems = inputs.size();
+ if(inputBags == null) {
+ accumulateData();
+ }
+
+ if(its != null) {
+ //we check if we are done with processing
+ //we do that by checking if all the iterators are used up
+ boolean finished = true;
+ for(int i = 0; i < its.length; i++) {
+ finished &= !its[i].hasNext();
+ }
+ if(finished) {
+ res.returnStatus = POStatus.STATUS_EOP;
+ return res;
+ }
+
+ }
+
+ if(data == null) {
+ //getNext being called for the first time or starting on new input data
+ //we instantiate the template array and start populating it with data
+ data = new Tuple[noItems];
+ for(int i = 0; i < noItems; ++i) {
+ data[i] = (Tuple) its[i].next();
+
+ }
+ res.result = CreateTuple(data);
+ res.returnStatus = POStatus.STATUS_OK;
+ return res;
+ } else {
+ for(int index = noItems - 1; index >= 0; --index) {
+ if(its[index].hasNext()) {
+ data[index] = (Tuple) its[index].next();
+ res.result = CreateTuple(data);
+ res.returnStatus = POStatus.STATUS_OK;
+ return res;
+ }
+ else{
+ // reset this index's iterator so cross product can be achieved
+ // we would be resetting this way only for the indexes from the end
+ // when the first index which needs to be flattened has reached the
+ // last element in its iterator, we won't come here - instead, we reset
+ // all iterators at the beginning of this method.
+ its[index] = (inputBags[index]).iterator();
+ data[index] = (Tuple) its[index].next();
+ }
+
+ }
+ }
+
+ return null;
+ }
+
+ private void accumulateData() throws ExecException {
+ int count = 0;
+ inputBags = new DataBag[inputs.size()];
+
+ its = new Iterator[inputs.size()];
+ for(PhysicalOperator op : inputs) {
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
+ inputBags[count] = bag;
+ for(Result res = op.getNext(dummyTuple); res.returnStatus != POStatus.STATUS_EOP; res = op.getNext(dummyTuple)) {
+ if(res.returnStatus == POStatus.STATUS_NULL)
+ continue;
+ if(res.returnStatus == POStatus.STATUS_ERR)
+ throw new ExecException("Error accumulating data in the local Cross operator");
+ if(res.returnStatus == POStatus.STATUS_OK)
+ bag.add((Tuple) res.result);
+ }
+ its[count++] = bag.iterator();
+ }
+ }
+
+ private Tuple CreateTuple(Tuple[] data) throws ExecException {
+ Tuple out = TupleFactory.getInstance().newTuple();
+
+ for(int i = 0; i < data.length; ++i) {
+ Tuple t = data[i];
+ int size = t.size();
+ for(int j = 0; j < size; ++j) {
+ out.append(t.get(j));
+ }
+
+ }
+
+ if(lineageTracer != null) {
+ ExampleTuple tOut = new ExampleTuple();
+ tOut.reference(out);
+ lineageTracer.insert(tOut);
+ for(int i = 0; i < data.length; i++) {
+ lineageTracer.union(tOut, data[i]);
+ }
+ return tOut;
+ }
+ return out;
+ }
+
+ @Override
+ public String name() {
+ // TODO Auto-generated method stub
+ return "POCrossLocal" + " - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ // TODO Auto-generated method stub
+ return true;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+}
Added: hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POStreamLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POStreamLocal.java?rev=725846&view=auto
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POStreamLocal.java (added)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POStreamLocal.java Thu Dec 11 14:29:29 2008
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Properties;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+
+public class POStreamLocal extends POStream {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 2L;
+
+ public POStreamLocal(OperatorKey k, ExecutableManager executableManager,
+ StreamingCommand command, Properties properties) {
+ super(k, executableManager, command, properties);
+ // TODO Auto-generated constructor stub
+ }
+
+
+ /**
+ * This is different from the Map-Reduce implementation of the POStream since there is no
+ * push model here. POStatus_EOP signals the end of input and can be used to decide when
+ * to stop the stdin to the process
+ */
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ // The POStream Operator works with ExecutableManager to
+ // send input to the streaming binary and to get output
+ // from it. To achieve a tuple oriented behavior, two queues
+ // are used - one for output from the binary and one for
+ // input to the binary. In each getNext() call:
+ // 1) If there is no more output expected from the binary, an EOP is
+ // sent to successor
+ // 2) If there is any output from the binary in the queue, it is passed
+ // down to the successor
+ // 3) if neither of these two are true and if it is possible to
+ // send input to the binary, then the next tuple from the
+ // predecessor is got and passed to the binary
+ try {
+ // if we are being called AFTER all output from the streaming
+ // binary has already been sent to us then just return EOP
+ // The "allOutputFromBinaryProcessed" flag is set when we see
+ // an EOS (End of Stream output) from streaming binary
+ if(allOutputFromBinaryProcessed) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+
+ // if we are here AFTER all map() calls have been completed
+ // AND AFTER we process all possible input to be sent to the
+ // streaming binary, then all we want to do is read output from
+ // the streaming binary
+ if(allInputFromPredecessorConsumed) {
+ Result r = binaryOutputQueue.take();
+ if(r.returnStatus == POStatus.STATUS_EOS) {
+ // If we received EOS, it means all output
+ // from the streaming binary has been sent to us
+ // So we can send an EOP to the successor in
+ // the pipeline. Also since we are being called
+ // after all input from predecessor has been processed
+ // it means we got here from a call from close() in
+ // map or reduce. So once we send this EOP down,
+ // getNext() in POStream should never be called. So
+ // we don't need to set any flag noting we saw all output
+ // from binary
+ r.returnStatus = POStatus.STATUS_EOP;
+ }
+ return(r);
+ }
+
+ // if we are here, we haven't consumed all input to be sent
+ // to the streaming binary - check if we are being called
+ // from close() on the map or reduce
+ //if(this.parentPlan.endOfAllInput) {
+ Result r = getNextHelper(t);
+ if(r.returnStatus == POStatus.STATUS_EOP) {
+ // we have now seen *ALL* possible input
+ // check if we ever had any real input
+ // in the course of the map/reduce - if we did
+ // then "initialized" will be true. If not, just
+ // send EOP down.
+ if(initialized) {
+ // signal End of ALL input to the Executable Manager's
+ // Input handler thread
+ binaryInputQueue.put(r);
+ // note this state for future calls
+ allInputFromPredecessorConsumed = true;
+ // look for output from binary
+ r = binaryOutputQueue.take();
+ if(r.returnStatus == POStatus.STATUS_EOS) {
+ // If we received EOS, it means all output
+ // from the streaming binary has been sent to us
+ // So we can send an EOP to the successor in
+ // the pipeline. Also since we are being called
+ // after all input from predecessor has been processed
+ // it means we got here from a call from close() in
+ // map or reduce. So once we send this EOP down,
+ // getNext() in POStream should never be called. So
+ // we don't need to set any flag noting we saw all output
+ // from binary
+ r.returnStatus = POStatus.STATUS_EOP;
+ }
+ }
+
+ } else if(r.returnStatus == POStatus.STATUS_EOS) {
+ // If we received EOS, it means all output
+ // from the streaming binary has been sent to us
+ // So we can send an EOP to the successor in
+ // the pipeline. Also we are being called
+ // from close() in map or reduce (this is so because
+ // only then this.parentPlan.endOfAllInput is true).
+ // So once we send this EOP down, getNext() in POStream
+ // should never be called. So we don't need to set any
+ // flag noting we saw all output from binary
+ r.returnStatus = POStatus.STATUS_EOP;
+ }
+ return r;
+// } else {
+// // we are not being called from close() - so
+// // we must be called from either map() or reduce()
+// // get the next Result from helper
+// Result r = getNextHelper(t);
+// if(r.returnStatus == POStatus.STATUS_EOS) {
+// // If we received EOS, it means all output
+// // from the streaming binary has been sent to us
+// // So we can send an EOP to the successor in
+// // the pipeline and also note this condition
+// // for future calls
+// r.returnStatus = POStatus.STATUS_EOP;
+// allOutputFromBinaryProcessed = true;
+// }
+// return r;
+// }
+
+ } catch(Exception e) {
+ throw new ExecException("Error while trying to get next result in POStream", e);
+ }
+
+
+ }
+
+
+
+}
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java?rev=725846&r1=725845&r2=725846&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java Thu Dec 11 14:29:29 2008
@@ -32,7 +32,7 @@
protected final Log log = LogFactory.getLog(getClass());
- protected ExecType execType = MAPREDUCE;
+ protected ExecType execType = LOCAL;
private MiniCluster cluster;
protected PigServer pigServer;
Added: hadoop/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEvalLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEvalLocal.java?rev=725846&view=auto
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEvalLocal.java (added)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEvalLocal.java Thu Dec 11 14:29:29 2008
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+public class TestAlgebraicEvalLocal extends TestCase {
+
+ private int LOOP_COUNT = 512;
+
+
+ private PigServer pig;
+
+ @Before
+ @Override
+ protected void setUp() throws Exception {
+ pig = new PigServer("local");
+ }
+
+ Boolean[] nullFlags = new Boolean[]{ false, true};
+
+ //MiniCluster cluster = MiniCluster.buildCluster();
+ @Test
+ public void testGroupCountWithMultipleFields() throws Throwable {
+ File tmpFile = File.createTempFile("test", "txt");
+ for (int k = 0; k < nullFlags.length; k++) {
+ System.err.println("Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k]);
+ // flag to indicate if both the keys forming
+ // the group key are null
+ int groupKeyWithNulls = 0;
+ if(nullFlags[k] == false) {
+ // generate data with no nulls
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ for(int i = 0; i < LOOP_COUNT; i++) {
+ for(int j=0; j< LOOP_COUNT; j++) {
+ ps.println(i + "\t" + i + "\t" + j%2);
+ }
+ }
+ ps.close();
+ } else {
+ // generate data with nulls
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ Random r = new Random();
+ for(int i = 0; i < LOOP_COUNT; i++) {
+ int rand = r.nextInt(LOOP_COUNT);
+ if(rand <= (0.2 * LOOP_COUNT) ) {
+ for(int j=0; j< LOOP_COUNT; j++) {
+ ps.println("\t" + i + "\t" + j%2);
+ }
+ } else if (rand > (0.2 * LOOP_COUNT) && rand <= (0.4 * LOOP_COUNT)) {
+ for(int j=0; j< LOOP_COUNT; j++) {
+ ps.println(i + "\t" + "\t" + j%2);
+ }
+ } else if (rand > (0.4 * LOOP_COUNT) && rand <= (0.6 * LOOP_COUNT)) {
+ for(int j=0; j< LOOP_COUNT; j++) {
+ ps.println("\t" + "\t" + j%2);
+ }
+ groupKeyWithNulls++;
+ } else {
+ for(int j=0; j< LOOP_COUNT; j++) {
+ ps.println(i + "\t" + i + "\t" + j%2);
+ }
+ }
+ }
+ ps.close();
+ }
+ pig.registerQuery(" a = group (load '" + Util.generateURI(tmpFile.toString()) + "') by ($0,$1);");
+ pig.registerQuery("b = foreach a generate flatten(group), SUM($1.$2);");
+ Iterator<Tuple> it = pig.openIterator("b");
+ int count = 0;
+ System.err.println("XX Starting");
+ while(it.hasNext()){
+ Tuple t = it.next();
+ System.err.println("XX "+ t);
+ int sum = ((Double)t.get(2)).intValue();
+ // if the first two fields (output of flatten(group))
+ // are both nulls then we should change the sum accordingly
+ if(t.get(0) == null && t.get(1) == null)
+ assertEquals( "Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k],
+ (LOOP_COUNT/2)*groupKeyWithNulls, sum);
+ else
+ assertEquals("Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k],
+ LOOP_COUNT/2, sum);
+
+ count++;
+ }
+ System.err.println("XX done");
+ if(groupKeyWithNulls == 0)
+ assertEquals("Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k], LOOP_COUNT, count);
+ else
+ assertEquals("Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k], LOOP_COUNT - groupKeyWithNulls + 1, count);
+
+ }
+ tmpFile.delete();
+
+ }
+
+ @Test
+ public void testSimpleCount() throws Exception {
+ File tmpFile = File.createTempFile("test", "txt");
+ for (int i = 0; i < nullFlags.length; i++) {
+ System.err.println("Testing testSimpleCount with null flag:" + nullFlags[i]);
+
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ int numNulls = generateInput(ps, nullFlags[i]);
+ String query = "myid = foreach (group (load '" + Util.generateURI(tmpFile.toString()) + "') all) generate COUNT($1);";
+ System.out.println(query);
+ pig.registerQuery(query);
+ Iterator it = pig.openIterator("myid");
+ tmpFile.delete();
+ Tuple t = (Tuple)it.next();
+ Long count = DataType.toLong(t.get(0));
+ assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i], count.longValue(), LOOP_COUNT);
+ }
+ }
+
+ @Test
+ public void testGroupCount() throws Throwable {
+ File tmpFile = File.createTempFile("test", "txt");
+ for (int i = 0; i < nullFlags.length; i++) {
+ System.err.println("Testing testGroupCount with null flag:" + nullFlags[i]);
+
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ int numNulls = generateInput(ps, nullFlags[i]);
+ String query = "myid = foreach (group (load '" + Util.generateURI(tmpFile.toString()) + "') all) generate group, COUNT($1) ;";
+ System.out.println(query);
+ pig.registerQuery(query);
+ Iterator it = pig.openIterator("myid");
+ tmpFile.delete();
+ Tuple t = (Tuple)it.next();
+ Long count = DataType.toLong(t.get(1));
+ assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i], count.longValue(), LOOP_COUNT);
+ }
+ }
+
+ @Test
+ public void testGroupReorderCount() throws Throwable {
+ File tmpFile = File.createTempFile("test", "txt");
+ for (int i = 0; i < nullFlags.length; i++) {
+ System.err.println("Testing testGroupCount with null flag:" + nullFlags[i]);
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ int numNulls = generateInput(ps, nullFlags[i]);
+ String query = "myid = foreach (group (load '" + Util.generateURI(tmpFile.toString()) + "') all) generate COUNT($1), group ;";
+ System.out.println(query);
+ pig.registerQuery(query);
+ Iterator it = pig.openIterator("myid");
+ tmpFile.delete();
+ Tuple t = (Tuple)it.next();
+ Long count = DataType.toLong(t.get(0));
+ assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i], count.longValue(), LOOP_COUNT);
+ }
+ }
+
+
+
+ @Test
+ public void testGroupUniqueColumnCount() throws Throwable {
+ File tmpFile = File.createTempFile("test", "txt");
+ for (int i = 0; i < nullFlags.length; i++) {
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ long groupsize = 0;
+ if(nullFlags[i] == false) {
+ // generate data without nulls
+ for(int j = 0; j < LOOP_COUNT; j++) {
+ if(j%10 == 0) groupsize++;
+ ps.println(j%10 + ":" + j);
+ }
+ } else {
+ // generate data with nulls
+ for(int j = 0; j < LOOP_COUNT; j++) {
+ if(j%10 == 0) groupsize++;
+ if(j % 20 == 0) {
+ // for half the groups
+ // emit nulls
+ ps.println(j%10 + ":");
+ } else {
+ ps.println(j%10 + ":" + j);
+ }
+ }
+ }
+ ps.close();
+ String query = "myid = foreach (group (load '" + Util.generateURI(tmpFile.toString()) + "' using " + PigStorage.class.getName() + "(':')) by $0) generate group, COUNT($1.$1) ;";
+ System.out.println(query);
+ pig.registerQuery(query);
+ Iterator it = pig.openIterator("myid");
+ tmpFile.delete();
+ System.err.println("Output from testGroupUniqueColumnCount");
+ while(it.hasNext()) {
+ Tuple t = (Tuple)it.next();
+ System.err.println(t);
+ String a = t.get(0).toString();
+ Double group = Double.valueOf(a.toString());
+ if(group == 0.0) {
+ Long count = DataType.toLong(t.get(1));
+ // right now count with nulls is same as
+ // count without nulls
+ assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i], groupsize, count.longValue());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testGroupDuplicateColumnCount() throws Throwable {
+ File tmpFile = File.createTempFile("test", "txt");
+ for (int i = 0; i < nullFlags.length; i++) {
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ long groupsize = 0;
+ if(nullFlags[i] == false) {
+ // generate data without nulls
+ for(int j = 0; j < LOOP_COUNT; j++) {
+ if(j%10 == 0) groupsize++;
+ ps.println(j%10 + ":" + j);
+ }
+ } else {
+ // generate data with nulls
+ for(int j = 0; j < LOOP_COUNT; j++) {
+ if(j%10 == 0) groupsize++;
+ if(j % 20 == 0) {
+ // for half the groups
+ // emit nulls
+ ps.println(j%10 + ":");
+ } else {
+ ps.println(j%10 + ":" + j);
+ }
+ }
+ }
+ ps.close();
+ String query = "myid = foreach (group (load '" + Util.generateURI(tmpFile.toString()) + "' using " + PigStorage.class.getName() + "(':')) by $0) generate group, COUNT($1.$1), COUNT($1.$0) ;";
+ System.out.println(query);
+ pig.registerQuery(query);
+ Iterator it = pig.openIterator("myid");
+ tmpFile.delete();
+ System.err.println("Output from testGroupDuplicateColumnCount");
+ while(it.hasNext()) {
+ Tuple t = (Tuple)it.next();
+ System.err.println(t);
+ String a = t.get(0).toString();
+ Double group = Double.valueOf(a.toString());
+ if(group == 0.0) {
+ // right now count with nulls is same
+ // as count without nulls
+ Long count = DataType.toLong(t.get(2));
+ assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i],groupsize, count.longValue());
+ count = DataType.toLong(t.get(1));
+ assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i],groupsize, count.longValue());
+ }
+ }
+ }
+ }
+
+ private int generateInput(PrintStream ps, boolean withNulls ) {
+ int numNulls = 0;
+ if(withNulls) {
+ // inject nulls randomly
+ for(int i = 0; i < LOOP_COUNT; i++) {
+ int rand = new Random().nextInt(LOOP_COUNT);
+ if(rand <= (0.3 * LOOP_COUNT) ) {
+ ps.println(":");
+ numNulls++;
+ } else {
+ ps.println(i + ":" + i);
+ }
+ }
+ } else {
+ for(int i = 0; i < LOOP_COUNT; i++) {
+ ps.println(i + ":" + i);
+ }
+ }
+ ps.close();
+ return numNulls;
+ }
+
+}