You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC
svn commit: r883836 [21/23] - in /hadoop/pig/branches/load-store-redesign:
./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/
contrib/zebra/ contrib/zebr...
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GFCross.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/builtin/GFCross.java Tue Nov 24 19:54:19 2009
@@ -33,7 +33,7 @@
BagFactory mBagFactory = BagFactory.getInstance();
TupleFactory mTupleFactory = TupleFactory.getInstance();
- public static int DEFAULT_PARALLELISM = 96;
+ public static final int DEFAULT_PARALLELISM = 96;
@Override
public DataBag exec(Tuple input) throws IOException {
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/io/FileLocalizer.java Tue Nov 24 19:54:19 2009
@@ -391,7 +391,9 @@
// TODO probably this should be replaced with the local file system
File f = (new File(fileSpec)).getParentFile();
if (f!=null){
- f.mkdirs();
+ boolean res = f.mkdirs();
+ if (!res)
+ log.warn("FileLocalizer.create: failed to create " + f);
}
return new FileOutputStream(fileSpec,append);
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java Tue Nov 24 19:54:19 2009
@@ -57,7 +57,6 @@
//get the attributes of cogroup that are modified during the trnalsation
MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = cg.getGroupByPlans();
-
for(LogicalOperator op: cg.getInputs()) {
ArrayList<LogicalPlan> newGByPlans = new ArrayList<LogicalPlan>();
for(LogicalPlan lp: mapGByPlans.get(op)) {
@@ -70,9 +69,41 @@
newGByPlans.add(lp);
}
}
+
+
mapGByPlans.removeKey(op);
mapGByPlans.put(op, newGByPlans);
}
+
+ // check if after translation none of group by plans in a cogroup
+ // have a project(*) - if they still do it's because the input
+ // for the project(*) did not have a schema - in this case, we should
+ // error out since we could have different number/types of
+ // cogroup keys
+ if(cg.getInputs().size() > 1) { // only for cogroups
+ for(LogicalOperator op: cg.getInputs()) {
+ for(LogicalPlan lp: mapGByPlans.get(op)) {
+ if(checkPlanForProjectStar(lp)) {
+ // not following Error handling guidelines to give error code
+ // and error source since this will get swallowed by the parser
+ // which will just return a ParseException
+ throw new VisitorException("Cogroup/Group by * is only allowed if " +
+ "the input has a schema");
+ }
+ }
+ }
+ // check if after translation all group by plans have same arity
+ int arity = mapGByPlans.get(cg.getInputs().get(0)).size();
+ for(LogicalOperator op: cg.getInputs()) {
+ if(arity != mapGByPlans.get(op).size()) {
+ // not following Error handling guidelines to give error code
+ // and error source since this will get swallowed by the parser
+ // which will just return a ParseException
+ throw new VisitorException("The arity of cogroup/group by columns " +
+ "do not match");
+ }
+ }
+ }
}
/* (non-Javadoc)
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java Tue Nov 24 19:54:19 2009
@@ -163,12 +163,14 @@
// check if any of the foreach's peers is a foreach flatten
// if so then this rule does not apply
- for(LogicalOperator peer: peers) {
- if(!peer.equals(foreach)) {
- if(peer instanceof LOForEach) {
- LOForEach peerForeach = (LOForEach)peer;
- if(peerForeach.hasFlatten().first) {
- return false;
+ if (peers != null){
+ for(LogicalOperator peer: peers) {
+ if(!peer.equals(foreach)) {
+ if(peer instanceof LOForEach) {
+ LOForEach peerForeach = (LOForEach)peer;
+ if(peerForeach.hasFlatten().first) {
+ return false;
+ }
}
}
}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/StreamOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/StreamOptimizer.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/StreamOptimizer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/optimizer/StreamOptimizer.java Tue Nov 24 19:54:19 2009
@@ -210,7 +210,7 @@
LogicalOperator lo = nodes.get(0);
if (lo == null || !(lo instanceof LOStream)) {
throw new RuntimeException("Expected stream, got " +
- lo.getClass().getName());
+ (lo == null ? lo : lo.getClass().getName()));
}
LOStream stream = (LOStream)lo;
if(mOptimizeLoad) {
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Nov 24 19:54:19 2009
@@ -173,15 +173,14 @@
static int undollar(String s) {
return Integer.parseInt(s.substring(1, s.length()));
}
-
-
+
Path getCurrentDir(PigContext pigContext) throws IOException {
DataStorage dfs = pigContext.getDfs();
ContainerDescriptor desc = dfs.getActiveContainer();
ElementDescriptor el = dfs.asElement(desc);
return new Path(el.toString());
}
-
+
LogicalOperator parseCogroup(ArrayList<CogroupInput> gis, LogicalPlan lp, LOCogroup.GROUPTYPE type) throws ParseException, PlanException{
log.trace("Entering parseCogroup");
@@ -237,15 +236,6 @@
*/
LogicalOperator parseJoin(ArrayList<CogroupInput> gis, LogicalPlan lp, LOJoin.JOINTYPE jt) throws ParseException, PlanException{
log.trace("Entering parseJoin");
- // Skewed Join behaves as regular join in local mode
- if (pigContext.getExecType() == ExecType.LOCAL && jt == LOJoin.JOINTYPE.SKEWED) {
- return rewriteJoin(gis,lp);
- }
-
- // Merge Join behaves as regular join in local mode
- if (pigContext.getExecType() == ExecType.LOCAL && jt == LOJoin.JOINTYPE.MERGE) {
- return rewriteJoin(gis,lp);
- }
int n = gis.size();
@@ -709,6 +699,56 @@
return output.toString() ;
}
+
+ public static String join(AbstractCollection<String> s, String delimiter) {
+ if (s.isEmpty()) return "";
+ Iterator<String> iter = s.iterator();
+ StringBuffer buffer = new StringBuffer(iter.next());
+ while (iter.hasNext()) {
+ buffer.append(delimiter);
+ buffer.append(iter.next());
+ }
+ return buffer.toString();
+ }
+
+
+ public static String[] getPathStrings(String commaSeparatedPaths) {
+ int length = commaSeparatedPaths.length();
+ int curlyOpen = 0;
+ int pathStart = 0;
+ boolean globPattern = false;
+ List<String> pathStrings = new ArrayList<String>();
+
+ for (int i=0; i<length; i++) {
+ char ch = commaSeparatedPaths.charAt(i);
+ switch(ch) {
+ case '{' : {
+ curlyOpen++;
+ if (!globPattern) {
+ globPattern = true;
+ }
+ break;
+ }
+ case '}' : {
+ curlyOpen--;
+ if (curlyOpen == 0 && globPattern) {
+ globPattern = false;
+ }
+ break;
+ }
+ case ',' : {
+ if (!globPattern) {
+ pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
+ pathStart = i + 1 ;
+ }
+ break;
+ }
+ }
+ }
+ pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
+
+ return pathStrings.toArray(new String[0]);
+ }
}
class FunctionType {
@@ -879,19 +919,19 @@
)
{
if(null != root) {
- log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases);
-
- //Translate all the project(*) leaves in the plan to a sequence of projections
- ProjectStarTranslator translate = new ProjectStarTranslator(lp);
- translate.visit();
-
- addLogicalPlan(root, lp);
-
try {
- log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
+ log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases);
+
+ //Translate all the project(*) leaves in the plan to a sequence of projections
+ ProjectStarTranslator translate = new ProjectStarTranslator(lp);
+ translate.visit();
+
+ addLogicalPlan(root, lp);
+
+ log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
} catch(FrontendException fee) {
- ParseException pe = new ParseException(fee.getMessage());
- pe.initCause(fee);
+ ParseException pe = new ParseException(fee.getMessage());
+ pe.initCause(fee);
throw pe;
}
}
@@ -1157,7 +1197,14 @@
)
| (<STORE> op = StoreClause(lp))
)
- [<PARALLEL> t2=<INTEGER> { op.setRequestedParallelism(Integer.parseInt(t2.image));} ]
+ [<PARALLEL> t2=<INTEGER> {
+ // In Local Mode we can only use one reducer
+ if( this.pigContext.getExecType() == ExecType.LOCAL ) {
+ op.setRequestedParallelism(1);
+ } else {
+ op.setRequestedParallelism(Integer.parseInt(t2.image));
+ }
+ } ]
)
{log.trace("Exiting BaseExpr"); return op;}
}
@@ -1944,21 +1991,18 @@
// For all types of join we create LOJoin and mark what type of join it is.
(
[<USING> ("\"replicated\"" {
- if(isOuter) {
- throw new ParseException("Replicated join does not support (left|right|full) outer joins");
- }
+ if(isFullOuter || isRightOuter) {
+ throw new ParseException("Replicated join does not support (right|full) outer joins");
+ }
frj = parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED);
}
| "\"repl\"" {
- if(isOuter) {
- throw new ParseException("Replicated join does not support (left|right|full) outer joins");
- }
+ if(isFullOuter || isRightOuter) {
+ throw new ParseException("Replicated join does not support (right|full) outer joins");
+ }
frj=parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED);
}
- |"\"skewed\"" {
- if(isOuter) {
- throw new ParseException("Skewed join does not support (left|right|full) outer joins");
- }
+ |"\"skewed\"" {
skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED);
}
|"\"merge\"" {
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Tue Nov 24 19:54:19 2009
@@ -116,7 +116,7 @@
* request. In order to ensure unique and consistent names, across
* all field schema objects, the object is made static.
*/
- public static CanonicalNamer canonicalNamer = new CanonicalNamer();
+ public static final CanonicalNamer canonicalNamer = new CanonicalNamer();
private static Log log = LogFactory.getLog(Schema.FieldSchema.class);
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Tue Nov 24 19:54:19 2009
@@ -81,7 +81,7 @@
private String currentAlias = null;
- public static MultiMap<Byte, Byte> castLookup = new MultiMap<Byte, Byte>();
+ public static final MultiMap<Byte, Byte> castLookup = new MultiMap<Byte, Byte>();
static{
//Ordering here decides the score for the best fit function.
//Do not change the order. Conversions to a smaller type is preferred
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/Operator.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/Operator.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/Operator.java Tue Nov 24 19:54:19 2009
@@ -94,13 +94,10 @@
*/
@Override
public boolean equals(Object obj) {
- if(obj instanceof Operator){
- Operator opObj = (Operator)obj;
- if(obj==this)
- return true;
- return mKey.equals(opObj);
- }
- return false;
+ if(obj==this)
+ return true;
+ else
+ return false;
}
/**
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/plan/OperatorPlan.java Tue Nov 24 19:54:19 2009
@@ -74,7 +74,7 @@
private List<E> mRoots;
private List<E> mLeaves;
- protected static Log log = LogFactory.getLog(OperatorPlan.class);
+ protected static final Log log = LogFactory.getLog(OperatorPlan.class);
public OperatorPlan() {
mRoots = new ArrayList<E>();
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/ExecutableManager.java Tue Nov 24 19:54:19 2009
@@ -347,12 +347,12 @@
// We should receive an EOP only when *ALL* input
// for this process has already been sent and no
// more input is expected
- if (inp.returnStatus == POStatus.STATUS_EOP) {
+ if (inp != null && inp.returnStatus == POStatus.STATUS_EOP) {
// signal cleanup in ExecutableManager
close();
return;
}
- if (inp.returnStatus == POStatus.STATUS_OK) {
+ if (inp != null && inp.returnStatus == POStatus.STATUS_OK) {
// Check if there was a problem with the managed process
if (outerrThreadsError != null) {
throw new IOException(
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/StreamingCommand.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/StreamingCommand.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/streaming/StreamingCommand.java Tue Nov 24 19:54:19 2009
@@ -502,10 +502,16 @@
}
public boolean equals(Object obj) {
- HandleSpec other = (HandleSpec)obj;
- return (other != null && name.equals(other.name) && spec.equals(other.spec));
+ if (obj instanceof HandleSpec){
+ HandleSpec other = (HandleSpec)obj;
+ return (other != null && name.equals(other.name) && spec.equals(other.spec));
+ } else
+ return false;
}
+ public int hashCode() {
+ return name.hashCode();
+ }
public Object clone() {
try {
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/CompilerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/CompilerUtils.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/CompilerUtils.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/CompilerUtils.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.builtin.IsEmpty;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+
+/*
+ * A class to add util functions that gets used by LogToPhyTranslator and MRCompiler
+ *
+ */
+public class CompilerUtils {
+
+ public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema inputSchema) throws PlanException {
+ // we currently have POProject[bag] as the only operator in the plan
+ // If the bag is an empty bag, we should replace
+ // it with a bag with one tuple with null fields so that when we flatten
+ // we do not drop records (flatten will drop records if the bag is left
+ // as an empty bag) and actually project nulls for the fields in
+ // the empty bag
+
+ // So we need to get to the following state:
+ // POProject[Bag]
+ // \
+ // POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)
+ // \ | POProject[Bag]
+ // \ | /
+ // POBinCond
+ POProject relationProject = (POProject) fePlan.getRoots().get(0);
+ try {
+
+ // condition of the bincond
+ POProject relationProjectForIsEmpty = relationProject.clone();
+ fePlan.add(relationProjectForIsEmpty);
+ String scope = relationProject.getOperatorKey().scope;
+ FuncSpec isEmptySpec = new FuncSpec(IsEmpty.class.getName());
+ Object f = PigContext.instantiateFuncFromSpec(isEmptySpec);
+ POUserFunc isEmpty = new POUserFunc(new OperatorKey(scope, NodeIdGenerator.getGenerator().
+ getNextNodeId(scope)), -1, null, isEmptySpec, (EvalFunc) f);
+ isEmpty.setResultType(DataType.BOOLEAN);
+ fePlan.add(isEmpty);
+ fePlan.connect(relationProjectForIsEmpty, isEmpty);
+
+ // lhs of bincond (const bag with null fields)
+ ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+ // the following should give a tuple with the
+ // required number of nulls
+ Tuple t = TupleFactory.getInstance().newTuple(inputSchema.size());
+ for(int i = 0; i < inputSchema.size(); i++) {
+ t.set(i, null);
+ }
+ List<Tuple> bagContents = new ArrayList<Tuple>(1);
+ bagContents.add(t);
+ DataBag bg = new NonSpillableDataBag(bagContents);
+ ce.setValue(bg);
+ ce.setResultType(DataType.BAG);
+ //this operator doesn't have any predecessors
+ fePlan.add(ce);
+
+ //rhs of bincond is the original project
+ // let's set up the bincond now
+ POBinCond bincond = new POBinCond(new OperatorKey(scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+ bincond.setCond(isEmpty);
+ bincond.setLhs(ce);
+ bincond.setRhs(relationProject);
+ bincond.setResultType(DataType.BAG);
+ fePlan.add(bincond);
+
+ fePlan.connect(isEmpty, bincond);
+ fePlan.connect(ce, bincond);
+ fePlan.connect(relationProject, bincond);
+
+ } catch (Exception e) {
+ throw new PlanException("Error setting up outerjoin", e);
+ }
+
+ }
+
+}
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/JarManager.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/JarManager.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/JarManager.java Tue Nov 24 19:54:19 2009
@@ -120,10 +120,9 @@
// log.error("Adding extra " + pigContext.extraJars.get(i));
mergeJar(jarFile, pigContext.extraJars.get(i), null, contents);
}
- if (pigContext != null) {
- jarFile.putNextEntry(new ZipEntry("pigContext"));
- new ObjectOutputStream(jarFile).writeObject(pigContext);
- }
+
+ jarFile.putNextEntry(new ZipEntry("pigContext"));
+ new ObjectOutputStream(jarFile).writeObject(pigContext);
jarFile.close();
}
Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/impl/util/UDFContext.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.util;
+
+import java.io.IOException;
+//import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.pig.impl.util.ObjectSerializer;
+
+public class UDFContext {
+
+ private Configuration jconf = null;
+ private HashMap<Integer, Properties> udfConfs;
+
+ private static UDFContext self = null;
+
+ private UDFContext() {
+ udfConfs = new HashMap<Integer, Properties>();
+ }
+
+ public static UDFContext getUDFContext() {
+ if (self == null) {
+ self = new UDFContext();
+ }
+ return self;
+ }
+
+ /**
+ * Adds the JobConf to this singleton. Will be
+ * called on the backend by the Map and Reduce
+ * functions so that UDFs can obtain the JobConf
+ * on the backend.
+ */
+ public void addJobConf(Configuration conf) {
+ jconf = conf;
+ }
+
+ /**
+ * Get the JobConf. This should only be called on
+ * the backend. It will return null on the frontend.
+ * @return JobConf for this job. This is a copy of the
+ * JobConf. Nothing written here will be kept by the system.
+ * getUDFConf should be used for recording UDF specific
+ * information.
+ */
+ public Configuration getJobConf() {
+ if (jconf != null) return new Configuration(jconf);
+ else return null;
+ }
+
+ /**
+ * Get a properties object that is specific to this UDF.
+ * Note that if a given UDF is called multiple times in a script,
+ * and each instance passes different arguments, then each will
+ * be provided with different configuration object.
+ * This can be used by loaders to pass their input object path
+ * or URI and separate themselves from other instances of the
+ * same loader. Constructor arguments could also be used,
+ * as they are available on both the front and back end.
+ *
+ * Note that this can only be used to share information
+ * across instantiations of the same function in the front end
+ * and between front end and back end. It cannot be used to
+ * share information between instantiations (that is, between
+ * map and/or reduce instances) on the back end at runtime.
+ * @param c of the UDF obtaining the properties object.
+ * @param args String arguments that make this instance of
+ * the UDF unique.
+ * @return A reference to the properties object specific to
+ * the calling UDF. This is a reference, not a copy.
+ * Any changes to this object will automatically be
+ * propogated to other instances of the UDF calling this
+ * function.
+ */
+
+ @SuppressWarnings("unchecked")
+ public Properties getUDFProperties(Class c, String[] args) {
+ Integer k = generateKey(c, args);
+ Properties p = udfConfs.get(k);
+ if (p == null) {
+ p = new Properties();
+ udfConfs.put(k, p);
+ }
+ return p;
+ }
+
+ /**
+ * Get a properties object that is specific to this UDF.
+ * Note that if a given UDF is called multiple times in a script,
+ * they will all be provided the same configuration object. It
+ * is up to the UDF to make sure the multiple instances do not
+ * stomp on each other.
+ *
+ * It is guaranteed that this properties object will be separate
+ * from that provided to any other UDF.
+ *
+ * Note that this can only be used to share information
+ * across instantiations of the same function in the front end
+ * and between front end and back end. It cannot be used to
+ * share information between instantiations (that is, between
+ * map and/or reduce instances) on the back end at runtime.
+ * @param c of the UDF obtaining the properties object.
+ * @return A reference to the properties object specific to
+ * the calling UDF. This is a reference, not a copy.
+ * Any changes to this object will automatically be
+ * propogated to other instances of the UDF calling this
+ * function.
+ */
+ @SuppressWarnings("unchecked")
+ public Properties getUDFProperties(Class c) {
+ Integer k = generateKey(c);
+ Properties p = udfConfs.get(k);
+ if (p == null) {
+ p = new Properties();
+ udfConfs.put(k, p);
+ }
+ return p;
+ }
+
+ /**
+ * Serialize the UDF specific information into an instance
+ * of JobConf. This function is intended to be called on
+ * the front end in preparation for sending the data to the
+ * backend.
+ * @param conf JobConf to serialize into
+ * @throws IOException if underlying serialization throws it
+ */
+ public void serialize(Configuration conf) throws IOException {
+ conf.set("pig.UDFContext", ObjectSerializer.serialize(udfConfs));
+ }
+
+ /**
+ * Populate the udfConfs field. This function is intended to
+ * be called by Map.configure or Reduce.configure on the backend.
+ * It assumes that addJobConf has already been called.
+ * @throws IOException if underlying deseralization throws it
+ */
+ @SuppressWarnings("unchecked")
+ public void deserialize() throws IOException {
+ udfConfs = (HashMap<Integer, Properties>)ObjectSerializer.deserialize(jconf.get("pig.UDFContext"));
+ }
+
+ @SuppressWarnings("unchecked")
+ private int generateKey(Class c) {
+ return c.getName().hashCode();
+ }
+
+ @SuppressWarnings("unchecked")
+ private int generateKey(Class c, String[] args) {
+ int hc = c.getName().hashCode();
+ for (int i = 0; i < args.length; i++) {
+ hc <<= 1;
+ hc ^= args[i].hashCode();
+ }
+ return hc;
+ }
+
+}
\ No newline at end of file
Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java Tue Nov 24 19:54:19 2009
@@ -18,8 +18,11 @@
package org.apache.pig.tools.pigstats;
+import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
@@ -41,7 +44,6 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
import org.apache.pig.impl.util.ObjectSerializer;
public class PigStats {
@@ -54,6 +56,8 @@
ArrayList<String> rootJobIDs = new ArrayList<String>();
ExecType mode;
+ private static final String localModeDataFile = "part-00000";
+
public void setMROperatorPlan(MROperPlan mrp) {
this.mrp = mrp;
}
@@ -99,11 +103,25 @@
//The counter placed before a store in the local plan should be able to get the number of records
for(PhysicalOperator op : php.getLeaves()) {
Map<String, String> jobStats = new HashMap<String, String>();
- stats.put(op.toString(), jobStats);
- POCounter counter = (POCounter) php.getPredecessors(op).get(0);
- jobStats.put("PIG_STATS_LOCAL_OUTPUT_RECORDS", (Long.valueOf(counter.getCount())).toString());
+ stats.put(op.toString(), jobStats);
String localFilePath=normalizeToLocalFilePath(((POStore)op).getSFile().getFileName());
- jobStats.put("PIG_STATS_LOCAL_BYTES_WRITTEN", (Long.valueOf(new File(localFilePath).length())).toString());
+ File outputFile = new File( localFilePath + File.separator + localModeDataFile );
+
+ long lineCounter = 0;
+ try {
+ BufferedReader in = new BufferedReader(new FileReader( outputFile ));
+ @SuppressWarnings("unused")
+ String tmpString = null;
+ while( (tmpString = in.readLine()) != null ) {
+ lineCounter++;
+ }
+ in.close();
+ } catch (FileNotFoundException e) {
+ } catch (IOException e) {
+ } finally {
+ jobStats.put("PIG_STATS_LOCAL_OUTPUT_RECORDS", (Long.valueOf(lineCounter)).toString());
+ }
+ jobStats.put("PIG_STATS_LOCAL_BYTES_WRITTEN", (Long.valueOf(outputFile.length())).toString());
}
return stats;
}
@@ -266,10 +284,10 @@
}
public long getBytesWritten() {
- if(mode == ExecType.LOCAL) {
- return getLocalBytesWritten();
- } else if(mode == ExecType.MAPREDUCE) {
- return getMapReduceBytesWritten();
+ if(mode == ExecType.LOCAL) {
+ return getLocalBytesWritten();
+ } else if( mode == ExecType.MAPREDUCE ) {
+ return getMapReduceBytesWritten();
} else {
throw new RuntimeException("Unrecognized mode. Either MapReduce or Local mode expected.");
}
Modified: hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml (original)
+++ hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml Tue Nov 24 19:54:19 2009
@@ -37,6 +37,9 @@
<Bug pattern="EI_EXPOSE_REP2" />
</Match>
<Match>
+ <Bug pattern="DP_CREATE_CLASSLOADER_INSIDE_DO_PRIVILEGED" />
+ </Match>
+ <Match>
<Class name="org.apache.pig.tools.parameters.Token" />
</Match>
<Match>
@@ -138,14 +141,43 @@
<Bug pattern="SE_TRANSIENT_FIELD_NOT_RESTORED" />
</Match>
<Match>
+ <Class name="org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce" />
+ <Field name="sJobConf" />
+ <Bug pattern="MS_CANNOT_BE_FINAL" />
+ </Match>
+ <Match>
+ <Class name="org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator" />
+ <Field name="reporter" />
+ <Bug pattern="MS_CANNOT_BE_FINAL" />
+ </Match>
+ <Match>
+ <Class name="org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator" />
+ <Field name="pigLogger" />
+ <Bug pattern="MS_PKGPROTECT" />
+ </Match>
+ <Match>
+ <Class name="org.apache.pig.impl.logicalLayer.LogicalPlanCloneHelper" />
+ <Field name="mOpToCloneMap" />
+ <Bug pattern="MS_PKGPROTECT" />
+ </Match>
+ <Match>
<Class name="org.apache.pig.impl.util.SpillableMemoryManager" />
<Bug pattern="DM_GC" />
</Match>
<Match>
+ <Class name="org.apache.pig.impl.logicalLayer.LogicalPlanBuilder" />
+ <Field name="classloader" />
+ <Bug pattern="MS_PKGPROTECT" />
+ </Match>
+ <Match>
<Class name="org.apache.pig.data.DistinctDataBag$DistinctDataBagIterator$TContainer" />
<Bug pattern="SIC_INNER_SHOULD_BE_STATIC" />
</Match>
<Match>
+ <Class name="org.apache.pig.data.InternalDistinctBag$DistinctDataBagIterator$TContainer" />
+ <Bug pattern="SIC_INNER_SHOULD_BE_STATIC" />
+ </Match>
+ <Match>
<Bug pattern="BC_BAD_CAST_TO_CONCRETE_COLLECTION" />
</Match>
<!-- This Tuple classes are not used -->
@@ -236,5 +268,61 @@
<Method name = "sendMTFValues" />
<Bug pattern="IM_BAD_CHECK_FOR_ODD" />
</Match>
+ <Match>
+ <Class name="org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger" />
+ <Bug pattern="UG_SYNC_SET_UNSYNC_GET" />
+ </Match>
+ <Match>
+ <Class name="org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream" />
+ <Bug pattern="IS2_INCONSISTENT_SYNC" />
+ </Match>
+ <Match>
+ <Class name="org.apache.pig.impl.builtin.DefaultIndexableLoader" />
+ <Bug pattern="UWF_NULL_FIELD" />
+ </Match>
+ <Match>
+ <Class name="org.apache.pig.impl.builtin.MergeJoinIndexer" />
+ <Bug pattern="UWF_NULL_FIELD" />
+ </Match>
+ <Match>
+ <Class name="org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler" />
+ <Bug pattern="NM_WRONG_PACKAGE" />
+ </Match>
+ <Match>
+ <Class name="org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NoopStoreRemover$PhysicalRemover" />
+ <Bug pattern="NM_WRONG_PACKAGE" />
+ </Match>
+ <Match>
+ <Class name="org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter" />
+ <Bug pattern="NM_WRONG_PACKAGE" />
+ </Match>
+ <Match>
+ <Class name="org.apache.pig.impl.logicalLayer.schema.Schema" />
+ <Method name = "equals" />
+ <Bug pattern="NP_NULL_ON_SOME_PATH" />
+ </Match>
+ <Match>
+ <Class name="org.apache.pig.impl.logicalLayer.schema.Schema$FieldSchema" />
+ <Method name = "equals" />
+ <Bug pattern="NP_NULL_ON_SOME_PATH" />
+ </Match>
+ <Match>
+ <Class name="org.apache.pig.impl.logicalLayer.optimizer.StreamOptimizer" />
+ <Method name = "transform" />
+ <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
+ </Match>
+ <Match>
+ <Class name="org.apache.pig.impl.logicalLayer.LogicalPlanBuilder" />
+ <Field name = "classloader" />
+ <Bug pattern="MS_CANNOT_BE_FINAL" />
+ </Match>
+ <Match>
+ <Bug pattern="RV_ABSOLUTE_VALUE_OF_HASHCODE" />
+ </Match>
+ <Match>
+ <Class name="org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression" />
+ <Field name = "res" />
+ <Bug pattern="MF_CLASS_MASKS_FIELD" />
+ </Match>
</FindBugsFilter>
Added: hadoop/pig/branches/load-store-redesign/test/hbase-site.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/hbase-site.xml?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/hbase-site.xml (added)
+++ hadoop/pig/branches/load-store-redesign/test/hbase-site.xml Tue Nov 24 19:54:19 2009
@@ -0,0 +1,137 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+ <property>
+ <name>hbase.regionserver.msginterval</name>
+ <value>1000</value>
+ <description>Interval between messages from the RegionServer to HMaster
+ in milliseconds. Default is 15. Set this value low if you want unit
+ tests to be responsive.
+ </description>
+ </property>
+ <property>
+ <name>hbase.client.pause</name>
+ <value>5000</value>
+ <description>General client pause value. Used mostly as value to wait
+ before running a retry of a failed get, region lookup, etc.</description>
+ </property>
+ <property>
+ <name>hbase.master.meta.thread.rescanfrequency</name>
+ <value>10000</value>
+ <description>How long the HMaster sleeps (in milliseconds) between scans of
+ the root and meta tables.
+ </description>
+ </property>
+ <property>
+ <name>hbase.server.thread.wakefrequency</name>
+ <value>1000</value>
+ <description>Time to sleep in between searches for work (in milliseconds).
+ Used as sleep interval by service threads such as META scanner and log roller.
+ </description>
+ </property>
+ <property>
+ <name>hbase.regionserver.handler.count</name>
+ <value>5</value>
+ <description>Count of RPC Server instances spun up on RegionServers
+ Same property is used by the HMaster for count of master handlers.
+ Default is 10.
+ </description>
+ </property>
+ <property>
+ <name>hbase.master.lease.period</name>
+ <value>6000</value>
+ <description>Length of time the master will wait before timing out a region
+ server lease. Since region servers report in every second (see above), this
+ value has been reduced so that the master will notice a dead region server
+ sooner. The default is 30 seconds.
+ </description>
+ </property>
+ <property>
+ <name>hbase.master.info.port</name>
+ <value>-1</value>
+ <description>The port for the hbase master web UI
+ Set to -1 if you do not want the info server to run.
+ </description>
+ </property>
+ <property>
+ <name>hbase.regionserver.info.port</name>
+ <value>-1</value>
+ <description>The port for the hbase regionserver web UI
+ Set to -1 if you do not want the info server to run.
+ </description>
+ </property>
+ <property>
+ <name>hbase.regionserver.info.port.auto</name>
+ <value>true</value>
+ <description>Info server auto port bind. Enables automatic port
+ search if hbase.regionserver.info.port is already in use.
+ Enabled for testing to run multiple tests on one machine.
+ </description>
+ </property>
+ <property>
+ <name>hbase.master.lease.thread.wakefrequency</name>
+ <value>3000</value>
+ <description>The interval between checks for expired region server leases.
+ This value has been reduced due to the other reduced values above so that
+ the master will notice a dead region server sooner. The default is 15 seconds.
+ </description>
+ </property>
+ <property>
+ <name>hbase.regionserver.optionalcacheflushinterval</name>
+ <value>10000</value>
+ <description>
+ Amount of time to wait since the last time a region was flushed before
+ invoking an optional cache flush. Default 60,000.
+ </description>
+ </property>
+ <property>
+ <name>hbase.regionserver.safemode</name>
+ <value>false</value>
+ <description>
+ Turn on/off safe mode in region server. Always on for production, always off
+ for tests.
+ </description>
+ </property>
+ <property>
+ <name>hbase.hregion.max.filesize</name>
+ <value>67108864</value>
+ <description>
+ Maximum desired file size for an HRegion. If filesize exceeds
+ value + (value / 2), the HRegion is split in two. Default: 256M.
+
+ Keep the maximum filesize small so we split more often in tests.
+ </description>
+ </property>
+ <property>
+ <name>hadoop.log.dir</name>
+ <value>${user.dir}/../logs</value>
+ </property>
+ <property>
+ <name>hbase.zookeeper.property.clientPort</name>
+ <value>21810</value>
+ <description>Property from ZooKeeper's config zoo.cfg.
+ The port at which the clients will connect.
+ </description>
+ </property>
+</configuration>
Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestAccumulator.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestAccumulator.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestAccumulator.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.After;
+import org.junit.Before;
+
+public class TestAccumulator extends TestCase{
+ private static final String INPUT_FILE = "AccumulatorInput.txt";
+ private static final String INPUT_FILE2 = "AccumulatorInput2.txt";
+ private static final String INPUT_FILE3 = "AccumulatorInput3.txt";
+
+ private PigServer pigServer;
+ private MiniCluster cluster = MiniCluster.buildCluster();
+
+ public TestAccumulator() throws ExecException, IOException{
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ // pigServer = new PigServer(ExecType.LOCAL);
+ pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "2");
+ pigServer.getPigContext().getProperties().setProperty("pig.exec.batchsize", "2");
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ createFiles();
+ }
+
+ private void createFiles() throws IOException {
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+
+ w.println("100\tapple");
+ w.println("200\torange");
+ w.println("300\tstrawberry");
+ w.println("300\tpear");
+ w.println("100\tapple");
+ w.println("300\tpear");
+ w.println("400\tapple");
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+ w = new PrintWriter(new FileWriter(INPUT_FILE2));
+
+ w.println("100\t");
+ w.println("100\t");
+ w.println("200\t");
+ w.println("200\t");
+ w.println("300\tstrawberry");
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
+
+ w = new PrintWriter(new FileWriter(INPUT_FILE3));
+
+ w.println("100\t1.0");
+ w.println("100\t2.0");
+ w.println("200\t1.1");
+ w.println("200\t2.1");
+ w.println("100\t3.0");
+ w.println("100\t4.0");
+ w.println("200\t3.1");
+ w.println("100\t5.0");
+ w.println("300\t3.3");
+ w.close();
+
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ new File(INPUT_FILE).delete();
+ Util.deleteFile(cluster, INPUT_FILE);
+ new File(INPUT_FILE2).delete();
+ Util.deleteFile(cluster, INPUT_FILE2);
+ new File(INPUT_FILE3).delete();
+ Util.deleteFile(cluster, INPUT_FILE3);
+ }
+
+
+ public void testAccumBasic() throws IOException{
+ // test group by
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulatorBagCount(A);");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 2);
+ expected.put(200, 1);
+ expected.put(300, 3);
+ expected.put(400, 1);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.BagCount(A);");
+
+ try{
+ iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ fail("accumulator should not be called.");
+ }catch(IOException e) {
+ // should throw exception from AccumulatorBagCount.
+ }
+
+ // test cogroup
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("C = cogroup A by id, B by id;");
+ pigServer.registerQuery("D = foreach C generate group, " +
+ "org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.AccumulatorBagCount(B);");
+
+ HashMap<Integer, String> expected2 = new HashMap<Integer, String>();
+ expected2.put(100, "2,2");
+ expected2.put(200, "1,1");
+ expected2.put(300, "3,3");
+ expected2.put(400, "1,1");
+
+
+ iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected2.get((Integer)t.get(0)), t.get(1).toString()+","+t.get(2).toString());
+ }
+ }
+
+ public void testAccumWithNegative() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, -org.apache.pig.test.utils.AccumulatorBagCount(A);");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, -2);
+ expected.put(200, -1);
+ expected.put(300, -3);
+ expected.put(400, -1);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+ public void testAccumWithAdd() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulatorBagCount(A)+1.0;");
+
+ {
+ HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
+ expected.put(100, 3.0);
+ expected.put(200, 2.0);
+ expected.put(300, 4.0);
+ expected.put(400, 2.0);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1));
+ }
+ }
+
+ {
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "org.apache.pig.test.utils.AccumulatorBagCount(A)+org.apache.pig.test.utils.AccumulatorBagCount(A);");
+
+ HashMap<Integer, Integer>expected = new HashMap<Integer, Integer>();
+ expected.put(100, 4);
+ expected.put(200, 2);
+ expected.put(300, 6);
+ expected.put(400, 2);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+ }
+
+ public void testAccumWithMinus() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ " org.apache.pig.test.utils.AccumulatorBagCount(A)*3.0-org.apache.pig.test.utils.AccumulatorBagCount(A);");
+
+ HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
+ expected.put(100, 4.0);
+ expected.put(200, 2.0);
+ expected.put(300, 6.0);
+ expected.put(400, 2.0);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1));
+ }
+ }
+
+ public void testAccumWithMod() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "org.apache.pig.test.utils.AccumulatorBagCount(A) % 2;");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 0);
+ expected.put(200, 1);
+ expected.put(300, 1);
+ expected.put(400, 1);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+ public void testAccumWithDivide() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "org.apache.pig.test.utils.AccumulatorBagCount(A)/2;");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 1);
+ expected.put(200, 0);
+ expected.put(300, 1);
+ expected.put(400, 0);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+ public void testAccumWithAnd() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "((org.apache.pig.test.utils.AccumulatorBagCount(A)>1 and " +
+ "org.apache.pig.test.utils.AccumulatorBagCount(A)<3)?0:1);");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 0);
+ expected.put(200, 1);
+ expected.put(300, 1);
+ expected.put(400, 1);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+ public void testAccumWithOr() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "((org.apache.pig.test.utils.AccumulatorBagCount(A)>3 or " +
+ "org.apache.pig.test.utils.AccumulatorBagCount(A)<2)?0:1);");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 1);
+ expected.put(200, 0);
+ expected.put(300, 1);
+ expected.put(400, 0);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+ public void testAccumWithRegexp() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "(((chararray)org.apache.pig.test.utils.AccumulatorBagCount(A)) matches '1*' ?0:1);");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 1);
+ expected.put(200, 0);
+ expected.put(300, 1);
+ expected.put(400, 0);
+
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+
+ public void testAccumWithIsNull() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B generate group, " +
+ "((chararray)org.apache.pig.test.utils.AccumulativeSumBag(A) is null?0:1);");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 0);
+ expected.put(200, 0);
+ expected.put(300, 1);
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+ public void testAccumWithDistinct() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);");
+ pigServer.registerQuery("B = group A by id;");
+ pigServer.registerQuery("C = foreach B { D = distinct A; generate group, org.apache.pig.test.utils.AccumulatorBagCount(D)+1;};");
+
+ HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
+ expected.put(100, 2);
+ expected.put(200, 2);
+ expected.put(300, 3);
+ expected.put(400, 2);
+
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));
+ }
+ }
+
+ public void testAccumWithSort() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);");
+ pigServer.registerQuery("B = foreach A generate id, f, id as t;");
+ pigServer.registerQuery("C = group B by id;");
+ pigServer.registerQuery("D = foreach C { E = order B by f; F = E.f; generate group, org.apache.pig.test.utils.AccumulativeSumBag(F);};");
+
+ HashMap<Integer, String> expected = new HashMap<Integer, String>();
+ expected.put(100, "(apple)(apple)");
+ expected.put(200, "(orange)");
+ expected.put(300, "(pear)(pear)(strawberry)");
+ expected.put(400, "(apple)");
+
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ assertEquals(expected.get((Integer)t.get(0)), (String)t.get(1));
+ }
+ }
+
+ public void testAccumWithBuildin() throws IOException{
+ pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
+ pigServer.registerQuery("C = group A by id;");
+ pigServer.registerQuery("D = foreach C generate group, SUM(A.v), AVG(A.v), COUNT(A.v), MIN(A.v), MAX(A.v);");
+
+ HashMap<Integer, Double[]> expected = new HashMap<Integer, Double[]>();
+ expected.put(100, new Double[]{15.0,3.0,5.0,1.0,5.0});
+ expected.put(200, new Double[]{6.3,2.1,3.0,1.1,3.1});
+ expected.put(300, new Double[]{3.3,3.3,1.0,3.3,3.3});
+
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+
+ while(iter.hasNext()) {
+ Tuple t = iter.next();
+ Double[] v = expected.get((Integer)t.get(0));
+ for(int i=0; i<v.length; i++) {
+ assertEquals(v[i].doubleValue(), ((Number)t.get(i+1)).doubleValue(), 0.0001);
+ }
+ }
+ }
+}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCombiner.java Tue Nov 24 19:54:19 2009
@@ -33,6 +33,7 @@
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.PigStorage;
@@ -366,4 +367,53 @@
}
}
+ public static class JiraPig1030 extends EvalFunc<String> {
+
+ public String exec(Tuple input) throws IOException {
+ return "";
+ }
+ }
+
+ @Test
+ public void testJiraPig1030() {
+ // test that combiner is NOT invoked when
+ // one of the elements in the foreach generate
+ // has a non-algebraic UDF that have multiple inputs
+ // (one of them is distinct).
+
+ String input[] = {
+ "pig1\t18\t2.1",
+ "pig2\t24\t3.3",
+ "pig5\t45\t2.4",
+ "pig1\t18\t2.1",
+ "pig1\t19\t2.1",
+ "pig2\t24\t4.5",
+ "pig1\t20\t3.1" };
+
+ try {
+ Util.createInputFile(cluster, "forEachNoCombinerInput.txt", input);
+ PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer.registerQuery("a = load 'forEachNoCombinerInput.txt' as (name:chararray, age:int, gpa:double);");
+ pigServer.registerQuery("b = group a all;");
+ pigServer.registerQuery("c = foreach b {" +
+ " d = distinct a.age;" +
+ " generate group, " + JiraPig1030.class.getName() + "(d, 0);};");
+
+ // make sure there isn't a combine plan in the explain output
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ pigServer.explain("c", ps);
+ assertFalse(baos.toString().matches("(?si).*combine plan.*"));
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ } finally {
+ try {
+ Util.deleteFile(cluster, "forEachNoCombinerInput.txt");
+ } catch (IOException e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+ }
}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCounters.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCounters.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestCounters.java Tue Nov 24 19:54:19 2009
@@ -538,11 +538,13 @@
File out = File.createTempFile("output", ".txt");
out.delete();
PigServer pigServer = new PigServer("local");
+ // FileLocalizer is initialized before using HDFS by previous tests
+ FileLocalizer.setInitialized(false);
pigServer.registerQuery("a = load '" + Util.encodeEscape(file.toString()) + "';");
pigServer.registerQuery("b = order a by $0;");
pigServer.registerQuery("c = group b by $0;");
pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
- PigStats pigStats = pigServer.store("d", out.getAbsolutePath()).getStatistics();
+ PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics();
InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
long filesize = 0;
while(is.read() != -1) filesize++;
@@ -552,8 +554,8 @@
//Map<String, Map<String, String>> stats = pigStats.getPigStats();
- assertEquals(count, pigStats.getRecordsWritten());
- assertEquals(filesize, pigStats.getBytesWritten());
+ assertEquals(10, pigStats.getRecordsWritten());
+ assertEquals(110, pigStats.getBytesWritten());
}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestFRJoin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestFRJoin.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestFRJoin.java Tue Nov 24 19:54:19 2009
@@ -22,8 +22,11 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
+import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
import junit.framework.Assert;
import junit.framework.TestCase;
@@ -56,6 +59,7 @@
public class TestFRJoin extends TestCase{
private static final String INPUT_FILE = "testFrJoinInput.txt";
+ private static final String INPUT_FILE2 = "testFrJoinInput2.txt";
private PigServer pigServer;
private MiniCluster cluster = MiniCluster.buildCluster();
private File tmpFile;
@@ -77,11 +81,21 @@
input[k++] = si + "\t" + j;
}
Util.createInputFile(cluster, INPUT_FILE, input);
+
+ String[] input2 = new String[2*(LOOP_SIZE/2)];
+ k = 0;
+ for(int i = 1; i <= LOOP_SIZE/2; i++) {
+ String si = i + "";
+ for(int j=1;j<=LOOP_SIZE/2;j++)
+ input2[k++] = si + "\t" + j;
+ }
+ Util.createInputFile(cluster, INPUT_FILE2, input2);
}
@After
public void tearDown() throws Exception {
Util.deleteFile(cluster, INPUT_FILE);
+ Util.deleteFile(cluster, INPUT_FILE2 );
}
public static class FRJoin extends EvalFunc<DataBag>{
@@ -408,8 +422,83 @@
Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
}
+
+ @Test
+ public void testFRJoinOut8() throws IOException {
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (x:int,y:int);");
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+ Map<String,Tuple> hashFRJoin = new HashMap<String,Tuple>();
+ Map<String,Tuple> hashJoin = new HashMap<String,Tuple>();
+ {
+ pigServer.registerQuery("C = join A by $0 left, B by $0 using \"replicated\";");
+ pigServer.registerQuery("D = join A by $1 left, B by $1 using \"replicated\";");
+ pigServer.registerQuery("E = union C,D;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ Tuple tuple = iter.next();
+ String Key = tuple.toDelimitedString(",");
+ hashFRJoin.put( Key, tuple);
+ dbfrj.add(tuple);
+
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by $0 left, B by $0;");
+ pigServer.registerQuery("D = join A by $1 left, B by $1;");
+ pigServer.registerQuery("E = union C,D;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+ while(iter.hasNext()) {
+ Tuple tuple = iter.next();
+ String Key = tuple.toDelimitedString(",");
+ hashJoin.put( Key, tuple);
+ dbshj.add(tuple);
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ }
@Test
+ public void testFRJoinOut9() throws IOException {
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (x:int,y:int);");
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+ Map<String,Tuple> hashFRJoin = new HashMap<String,Tuple>();
+ Map<String,Tuple> hashJoin = new HashMap<String,Tuple>();
+ {
+ pigServer.registerQuery("C = join A by $0 left, B by $0 using \"repl\";");
+ pigServer.registerQuery("D = join A by $1 left, B by $1 using \"repl\";");
+ pigServer.registerQuery("E = union C,D;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+
+ while(iter.hasNext()) {
+ Tuple tuple = iter.next();
+ String Key = tuple.toDelimitedString(",");
+ hashFRJoin.put( Key, tuple);
+ dbfrj.add(tuple);
+
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by $0 left, B by $0;");
+ pigServer.registerQuery("D = join A by $1 left, B by $1;");
+ pigServer.registerQuery("E = union C,D;");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+ while(iter.hasNext()) {
+ Tuple tuple = iter.next();
+ String Key = tuple.toDelimitedString(",");
+ hashJoin.put( Key, tuple);
+ dbshj.add(tuple);
+ }
+ }
+ Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ }
+
+ @Test
public void testFRJoinSch1() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Tue Nov 24 19:54:19 2009
@@ -78,12 +78,13 @@
t = it.next();
count[i] = (Long)t.get(0);
}
-
+
Assert.assertFalse(it.hasNext());
- Assert.assertEquals(3L, count[0]);
+ // Pig's previous local mode was screwed up correcting that
+ Assert.assertEquals(5L, count[0]);
Assert.assertEquals(5L, count[1]);
- Assert.assertEquals(5L, count[2]);
+ Assert.assertEquals(3L, count[2]);
}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestHBaseStorage.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestHBaseStorage.java Tue Nov 24 19:54:19 2009
@@ -16,19 +16,27 @@
*/
package org.apache.pig.test;
+import java.io.File;
import java.io.IOException;
import java.util.Iterator;
+import junit.framework.TestCase;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.pig.ExecType;
@@ -37,11 +45,10 @@
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
import org.junit.Before;
import org.junit.Test;
-import junit.framework.TestCase;
-
/** {@link org.apache.pig.backend.hadoop.hbase.HBaseStorage} Test Case **/
public class TestHBaseStorage extends TestCase {
@@ -51,6 +58,7 @@
private MiniCluster cluster = MiniCluster.buildCluster();
private HBaseConfiguration conf;
private MiniHBaseCluster hbaseCluster;
+ private MiniZooKeeperCluster zooKeeperCluster;
private PigServer pig;
@@ -70,8 +78,23 @@
@Override
protected void setUp() throws Exception {
super.setUp();
+
conf = new HBaseConfiguration(ConfigurationUtil.
toConfiguration(cluster.getProperties()));
+ conf.set("fs.default.name", cluster.getFileSystem().getUri().toString());
+ Path parentdir = cluster.getFileSystem().getHomeDirectory();
+ conf.set(HConstants.HBASE_DIR, parentdir.toString());
+
+ // Make the thread wake frequency a little slower so other threads
+ // can run
+ conf.setInt("hbase.server.thread.wakefrequency", 2000);
+
+ // Make lease timeout longer, lease checks less frequent
+ conf.setInt("hbase.master.lease.period", 10 * 1000);
+
+ // Increase the amount of time between client retries
+ conf.setLong("hbase.client.pause", 15 * 1000);
+
try {
hBaseClusterSetup();
} catch (Exception e) {
@@ -81,17 +104,28 @@
throw e;
}
- pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pig = new PigServer(ExecType.MAPREDUCE, ConfigurationUtil.toProperties(conf));
}
/**
* Actually start the MiniHBase instance.
*/
protected void hBaseClusterSetup() throws Exception {
+ zooKeeperCluster = new MiniZooKeeperCluster();
+ int clientPort = this.zooKeeperCluster.startup(new File("build/test"));
+ conf.set("hbase.zookeeper.property.clientPort",clientPort+"");
// start the mini cluster
hbaseCluster = new MiniHBaseCluster(conf, NUM_REGIONSERVERS);
// opening the META table ensures that cluster is running
- new HTable(conf, HConstants.META_TABLE_NAME);
+ while(true){
+ try{
+ new HTable(conf, HConstants.META_TABLE_NAME);
+ break;
+ }catch(IOException e){
+ Thread.sleep(1000);
+ }
+
+ }
}
@Override
@@ -108,6 +142,13 @@
LOG.warn("Closing mini hbase cluster", e);
}
}
+ if (zooKeeperCluster!=null){
+ try{
+ zooKeeperCluster.shutdown();
+ } catch (IOException e){
+ LOG.warn("Closing zookeeper cluster",e);
+ }
+ }
} catch (Exception e) {
LOG.error(e);
}
@@ -122,6 +163,7 @@
@Test
public void testLoadFromHBase() throws IOException, ExecException {
prepareTable();
+
pig.registerQuery("a = load 'hbase://" + TESTTABLE + "' using " +
"org.apache.pig.backend.hadoop.hbase.HBaseStorage('" + TESTCOLUMN_A +
" " + TESTCOLUMN_B + " " + TESTCOLUMN_C + "') as (col_a, col_b:int, col_c);");
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java Tue Nov 24 19:54:19 2009
@@ -456,7 +456,7 @@
lpt.buildPlan("a = load 'a.txt' as (n:chararray, a:int); ");
lpt.buildPlan("b = load 'b.txt' as (n:chararray, m:chararray); ");
String[] types = new String[] { "left", "right", "full" };
- String[] joinTypes = new String[] { "replicated", "repl", "skewed", "merge" };
+ String[] joinTypes = new String[] { "replicated", "repl", "merge" };
for (int i = 0; i < types.length; i++) {
for(int j = 0; j < joinTypes.length; j++) {
boolean errCaught = false;
@@ -466,9 +466,20 @@
} catch(Exception e) {
errCaught = true;
- assertEquals(true, e.getMessage().contains("does not support (left|right|full) outer joins"));
+ if( j == 0 || j == 1 ) {
+ // This after adding support of LeftOuter Join to replicated Join
+ assertEquals(true, e.getMessage().contains("does not support (right|full) outer joins"));
+ } else {
+ assertEquals(true, e.getMessage().contains("does not support (left|right|full) outer joins"));
+ }
+ }
+ if( i == 0 && ( j == 0 || j== 1 ) ) {
+ // This after adding support of LeftOuter Join to replicated Join
+ assertEquals(false, errCaught);
+ }
+ else {
+ assertEquals(true, errCaught);
}
- assertEquals(true, errCaught);
}
}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoad.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoad.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoad.java Tue Nov 24 19:54:19 2009
@@ -162,6 +162,36 @@
}
}
+ @Test
+ public void testCommaSeparatedString() throws Exception {
+ checkLoadPath("usr/pig/a,usr/pig/b","/tmp/usr/pig/a,/tmp/usr/pig/b");
+ }
+
+ @Test
+ public void testCommaSeparatedString2() throws Exception {
+ checkLoadPath("t?s*,test","/tmp/t?s*,/tmp/test");
+ }
+
+ @Test
+ public void testCommaSeparatedString3() throws Exception {
+ checkLoadPath("hdfs:/tmp/test,hdfs:/tmp/test2,hdfs:/tmp/test3","/tmp/test,/tmp/test2,/tmp/test3");
+ }
+
+ @Test
+ public void testCommaSeparatedString4() throws Exception {
+ checkLoadPath("usr/pig/{a,c},usr/pig/b","/tmp/usr/pig/{a,c},/tmp/usr/pig/b");
+ }
+
+ @Test
+ public void testCommaSeparatedString5() throws Exception {
+ checkLoadPath("/usr/pig/{a,c},usr/pig/b","/usr/pig/{a,c},/tmp/usr/pig/b");
+ }
+
+ @Test
+ public void testCommaSeparatedString6() throws Exception {
+ checkLoadPath("usr/pig/{a,c},/usr/pig/b","/tmp/usr/pig/{a,c},/usr/pig/b");
+ }
+
private void checkLoadPath(String orig, String expected) throws Exception {
pc.getProperties().setProperty("opt.multiquery", "" + true);