You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/02/27 01:52:08 UTC
svn commit: r631443 [5/10] - in /incubator/pig/branches/types: ./
lib-src/bzip2/org/apache/tools/bzip2r/ lib-src/shock/org/apache/pig/shock/
lib/ scripts/ src/org/apache/pig/ src/org/apache/pig/backend/
src/org/apache/pig/backend/datastorage/ src/org/a...
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.local.executionengine;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.HashSet;
+
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.backend.executionengine.ExecLogicalPlan;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.backend.executionengine.ExecScopedLogicalOperator;
+import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
+import org.apache.pig.impl.logicalLayer.*;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
+import org.apache.pig.impl.eval.cond.Cond;
+import org.apache.pig.impl.io.FileSpec;
+import java.util.Iterator;
+
+
+public class LocalExecutionEngine implements ExecutionEngine {
+
+ protected PigContext pigContext;
+ protected DataStorage ds;
+ protected NodeIdGenerator nodeIdGenerator;
+
+ // key: the operator key from the logical plan that originated the physical plan
+ // val: the operator key for the root of the phyisical plan
+ protected Map<OperatorKey, OperatorKey> logicalToPhysicalKeys;
+
+ protected Map<OperatorKey, ExecPhysicalOperator> physicalOpTable;
+
+ // map from LOGICAL key to into about the execution
+ protected Map<OperatorKey, LocalResult> materializedResults;
+
+ public LocalExecutionEngine(PigContext pigContext) {
+ this.pigContext = pigContext;
+ this.ds = pigContext.getLfs();
+ this.nodeIdGenerator = NodeIdGenerator.getGenerator();
+ this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();
+ this.physicalOpTable = new HashMap<OperatorKey, ExecPhysicalOperator>();
+ this.materializedResults = new HashMap<OperatorKey, LocalResult>();
+ }
+
+ public DataStorage getDataStorage() {
+ return this.ds;
+ }
+
+ public void init() throws ExecException {
+ ;
+ }
+
+ public void close() throws ExecException {
+ ;
+ }
+
+ public Properties getConfiguration() throws ExecException {
+ Properties conf = new Properties();
+ return conf;
+ }
+
+ public void updateConfiguration(Properties newConfiguration)
+ throws ExecException {
+ ;
+ }
+
+ public Map<String, Object> getStatistics() throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+
+ public LocalPhysicalPlan compile(ExecLogicalPlan plan,
+ Properties properties)
+ throws ExecException {
+ if (plan == null) {
+ throw new ExecException("No Plan to compile");
+ }
+
+ return compile(new ExecLogicalPlan[]{ plan } , properties);
+ }
+
+ public LocalPhysicalPlan compile(ExecLogicalPlan[] plans,
+ Properties properties)
+ throws ExecException {
+ if (plans == null) {
+ throw new ExecException("No Plans to compile");
+ }
+
+ OperatorKey physicalKey = null;
+ for (int i = 0; i < plans.length; ++i) {
+ ExecLogicalPlan curPlan = null;
+
+ curPlan = plans[ i ];
+
+ OperatorKey logicalKey = curPlan.getRoot();
+
+ physicalKey = logicalToPhysicalKeys.get(logicalKey);
+
+ if (physicalKey == null) {
+ physicalKey = doCompile(curPlan.getRoot(),
+ curPlan.getOpTable(),
+ properties);
+
+ logicalToPhysicalKeys.put(logicalKey, physicalKey);
+ }
+ }
+
+ return new LocalPhysicalPlan(physicalKey, physicalOpTable);
+ }
+
+ public LocalJob execute(ExecPhysicalPlan plan) throws ExecException {
+ DataBag results = BagFactory.getInstance().newDefaultBag();
+ try {
+ PhysicalOperator pp = (PhysicalOperator)physicalOpTable.get(plan.getRoot());
+
+ pp.open();
+
+ Tuple t;
+ while ((t = (Tuple) pp.getNext()) != null) {
+ results.add(t);
+ }
+
+ pp.close();
+ }
+ catch (IOException e) {
+ throw new ExecException(e);
+ }
+
+ return new LocalJob(results, JOB_STATUS.COMPLETED);
+ }
+
+ public LocalJob submit(ExecPhysicalPlan plan) throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+ public Collection<ExecJob> runningJobs(Properties properties) throws ExecException {
+ return new HashSet<ExecJob>();
+ }
+
+ public Collection<String> activeScopes() throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void reclaimScope(String scope) throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+ private OperatorKey doCompile(OperatorKey logicalKey,
+ Map<OperatorKey, LogicalOperator> logicalOpTable,
+ Properties properties)
+ throws ExecException {
+
+ LocalResult materializedResult = materializedResults.get(logicalKey);
+
+ if (materializedResult != null) {
+ ExecPhysicalOperator pp = new POLoad(logicalKey.getScope(),
+ nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
+ physicalOpTable,
+ pigContext,
+ materializedResult.outFileSpec,
+ LogicalOperator.FIXED);
+
+ OperatorKey ppKey = new OperatorKey(pp.getScope(), pp.getId());
+
+ return ppKey;
+ }
+
+ OperatorKey physicalKey = new OperatorKey();
+
+ if (compileOperator(logicalKey, logicalOpTable, properties, physicalKey)) {
+ for (int i = 0; i < logicalOpTable.get(logicalKey).getInputs().size(); ++i) {
+ ((PhysicalOperator)physicalOpTable.get(physicalKey)).inputs[i] =
+ doCompile(logicalOpTable.get(logicalKey).getInputs().get(i), logicalOpTable, properties);
+ }
+ }
+
+ return physicalKey;
+ }
+
+ private boolean compileOperator(OperatorKey logicalKey,
+ Map<OperatorKey, LogicalOperator> logicalOpTable,
+ Properties properties,
+ OperatorKey physicalKey)
+ throws ExecException {
+ ExecPhysicalOperator pp;
+ LogicalOperator lo = logicalOpTable.get(logicalKey);
+ String scope = lo.getScope();
+ boolean compileInputs = true;
+
+ if (lo instanceof LOEval) {
+
+ pp = new POEval(scope,
+ nodeIdGenerator.getNextNodeId(scope),
+ physicalOpTable,
+ ((LOEval) lo).getSpec(),
+ lo.getOutputType());
+ }
+ else if (lo instanceof LOCogroup) {
+ pp = new POCogroup(scope,
+ nodeIdGenerator.getNextNodeId(scope),
+ physicalOpTable,
+ ((LOCogroup) lo).getSpecs(),
+ lo.getOutputType());
+ }
+ else if (lo instanceof LOLoad) {
+ pp = new POLoad(scope,
+ nodeIdGenerator.getNextNodeId(scope),
+ physicalOpTable,
+ pigContext,
+ ((LOLoad)lo).getInputFileSpec(),
+ lo.getOutputType());
+ }
+ else if (lo instanceof LOSplitOutput) {
+ LOSplitOutput loso = (LOSplitOutput)lo;
+ LOSplit los = ((LOSplit)(logicalOpTable.get(loso.getInputs().get(0))));
+
+ pp = new POSplit(scope,
+ nodeIdGenerator.getNextNodeId(scope),
+ physicalOpTable,
+ doCompile(los.getInputs().get(0),
+ logicalOpTable,
+ properties),
+ los.getConditions(),
+ loso.getReadFrom(),
+ lo.getOutputType());
+
+ compileInputs = false;
+ }
+ else if (lo instanceof LOStore) {
+ pp = new POStore(scope,
+ nodeIdGenerator.getNextNodeId(scope),
+ physicalOpTable,
+ lo.getInputs().get(0),
+ materializedResults,
+ ((LOStore)lo).getOutputFileSpec(),
+ ((LOStore)lo).isAppend(),
+ pigContext);
+ }
+ else if (lo instanceof LOUnion) {
+ pp = new POUnion(scope,
+ nodeIdGenerator.getNextNodeId(scope),
+ physicalOpTable,
+ ((LOUnion)lo).getInputs().size(),
+ lo.getOutputType());
+ }
+ else if (lo instanceof LOSort) {
+ pp = new POSort(scope,
+ nodeIdGenerator.getNextNodeId(scope),
+ physicalOpTable,
+ ((LOSort)lo).getSortSpec(),
+ lo.getOutputType());
+ }
+ else {
+ throw new ExecException("Internal error: Unknown logical operator.");
+ }
+
+ physicalKey.scope = pp.getScope();
+ physicalKey.id = pp.getId();
+
+ logicalToPhysicalKeys.put(logicalKey, physicalKey);
+
+ return compileInputs;
+ }
+}
+
+
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalJob.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalJob.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalJob.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalJob.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DataBag;
+
+public class LocalJob implements ExecJob {
+
+ protected DataBag results;
+ protected JOB_STATUS status;
+
+ public LocalJob(DataBag results, JOB_STATUS status) {
+ this.results = results;
+ this.status = status;
+ }
+
+ public JOB_STATUS getStatus() {
+ return status;
+ }
+
+ public boolean hasCompleted() throws ExecException {
+ return true;
+ }
+
+ public Iterator<Tuple> getResults() throws ExecException {
+ return this.results.iterator();
+ }
+
+ public Properties getContiguration() {
+ Properties props = new Properties();
+ return props;
+ }
+
+ public Map<String, Object> getStatistics() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void completionNotification(Object cookie) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void kill() throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void getLogs(OutputStream log) throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void getSTDOut(OutputStream out) throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void getSTDError(OutputStream error) throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPhysicalPlan.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPhysicalPlan.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPhysicalPlan.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.Properties;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+import org.apache.pig.impl.physicalLayer.POPrinter;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+
+public class LocalPhysicalPlan implements ExecPhysicalPlan {
+ private static final long serialVersionUID = 1;
+
+ protected OperatorKey root;
+ protected Map<OperatorKey, ExecPhysicalOperator> opTable;
+
+ LocalPhysicalPlan(OperatorKey root,
+ Map<OperatorKey, ExecPhysicalOperator> opTable) {
+ this.root = root;
+ this.opTable = opTable;
+ }
+
+ public Properties getConfiguration() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void updateConfiguration(Properties configuration)
+ throws ExecException {
+ throw new UnsupportedOperationException();
+ }
+
+ public void explain(OutputStream out) {
+ POVisitor lprinter = new POPrinter(opTable, new PrintStream(out));
+
+ ((PhysicalOperator)opTable.get(root)).visit(lprinter);
+ }
+
+ public Map<OperatorKey, ExecPhysicalOperator> getOpTable() {
+ return opTable;
+ }
+
+ public OperatorKey getRoot() {
+ return root;
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalResult.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalResult.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalResult.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalResult.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.local.executionengine;
+
+import java.util.Iterator;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileSpec;
+
+public class LocalResult {
+ // could record other information like when file was first
+ // created, how many times it's been re-used (in this context we
+ // are inside the same scope)
+ // ...
+ //
+ public FileSpec outFileSpec;
+
+ public LocalResult(FileSpec outFileSpec) {
+ this.outFileSpec = outFileSpec;
+ }
+}
+
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POCogroup.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POCogroup.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POCogroup.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.data.AmendableTuple;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+
+// n-ary, blocking operator. Output has schema: < group_label, { <1>, <2>, ... }, { <a>, <b>, ... } >
+public class POCogroup extends PhysicalOperator {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ List<Object[]>[] sortedInputs;
+ List<EvalSpec> specs;
+
+ public POCogroup(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> opTable,
+ List<EvalSpec> specs,
+ int outputType) {
+ super(scope, id, opTable, outputType);
+ this.inputs = new OperatorKey[specs.size()];
+ this.specs = specs;
+ }
+
+ // drain all inputs, and sort each by group (remember, this is a blocking operator!)
+ @Override
+ @SuppressWarnings("unchecked")
+ public boolean open() throws IOException {
+ if (!super.open())
+ return false;
+
+ sortedInputs = new ArrayList[inputs.length];
+
+ for (int i = 0; i < inputs.length; i++) {
+
+ final int finalI = i;
+ sortedInputs[i] = new ArrayList<Object[]>();
+
+ DataCollector outputFromSpec = new DataCollector(null){
+ @Override
+ public void add(Object d) {
+ sortedInputs[finalI].add(LOCogroup.getGroupAndTuple(d));
+ }
+ };
+
+ DataCollector inputToSpec = specs.get(i).setupPipe(outputFromSpec);
+
+ Tuple t;
+ while ((t = (Tuple) ((PhysicalOperator)opTable.get(inputs[i])).getNext()) != null) {
+ inputToSpec.add(t);
+ }
+ inputToSpec.finishPipe();
+
+ Collections.sort(sortedInputs[i], new Comparator<Object[]>() {
+ public int compare(Object[] a, Object[] b) {
+ return DataType.compare(a[0], b[0]);
+ }
+ });
+ }
+
+ return true;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+
+ while (true) { // loop until we find a tuple we're allowed to output (or we hit the end)
+
+ // find the smallest group among all inputs (this is the group we should make a tuple
+ // out of)
+ Object smallestGroup = null;
+ for (int i = 0; i < inputs.length; i++) {
+ if (sortedInputs[i].size() > 0) {
+ Object g = (sortedInputs[i].get(0))[0];
+ if (smallestGroup == null || DataType.compare(g, smallestGroup)<0)
+ smallestGroup = g;
+ }
+ }
+
+ if (smallestGroup == null)
+ return null; // we hit the end of the groups, so we're done
+
+ // find all tuples in each input pertaining to the group of interest, and combine the
+ // data into a single tuple
+
+ Tuple output;
+ if (outputType == LogicalOperator.AMENDABLE) output = new AmendableTuple(1 + inputs.length, smallestGroup);
+ else output = TupleFactory.getInstance().newTuple(1 + inputs.length);
+
+ // set first field to the group tuple
+ output.set(0, smallestGroup);
+
+ if (lineageTracer != null) lineageTracer.insert(output);
+
+ boolean done = true;
+ for (int i = 0; i < inputs.length; i++) {
+ DataBag b = BagFactory.getInstance().newDefaultBag();
+
+ while (sortedInputs[i].size() > 0) {
+ Object g = sortedInputs[i].get(0)[0];
+
+ Tuple t = (Tuple) sortedInputs[i].get(0)[1];
+
+ int c = DataType.compare(g, smallestGroup);
+ if (c < 0) {
+ sortedInputs[i].remove(0); // discard this tuple
+ } else if (c == 0) {
+ b.add(t);
+ if (lineageTracer != null) lineageTracer.union(t, output); // update lineage
+ sortedInputs[i].remove(0);
+ } else {
+ break;
+ }
+ }
+
+ if (specs.get(i).isInner() && b.size() == 0)
+ done = false; // this input uses "inner" semantics, and it has no tuples for
+ // this group, so suppress the tuple we're currently building
+
+ output.set(1 + i, b);
+ }
+
+ if (done)
+ return output;
+ }
+
+ }
+
+ public void visit(POVisitor v) {
+ v.visitCogroup(this);
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POEval.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POEval.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POEval.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+import org.apache.pig.impl.util.DataBuffer;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+
+// unary, non-blocking operator.
+public class POEval extends PhysicalOperator {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ EvalSpec spec;
+ private DataBuffer buf = null;
+ private DataCollector evalPipeline = null;
+ private boolean inputDrained;
+ Tuple lastAdded = null; // for lineage bookkeeping
+
+ public POEval(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> opTable,
+ OperatorKey input,
+ EvalSpec specIn,
+ int outputType) {
+ super(scope, id, opTable, outputType);
+ inputs = new OperatorKey[1];
+ inputs[0] = input;
+
+ spec = specIn;
+ }
+
+ public POEval(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> opTable,
+ EvalSpec specIn,
+ int outputType) {
+ super(scope, id, opTable, outputType);
+ inputs = new OperatorKey[1];
+ inputs[0] = null;
+
+ spec = specIn;
+ }
+
+ @Override
+ public boolean open() throws IOException {
+ if (!super.open()) return false;
+
+ if (buf==null)
+ buf = new DataBuffer();
+ if (evalPipeline == null)
+ evalPipeline = spec.setupPipe(buf);
+
+ inputDrained = false;
+
+ return true;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ while (true) {
+ if (buf.isEmpty()){
+ // no pending outputs, so look to input to provide more food
+
+ if (inputDrained){
+ return null;
+ }
+
+ Tuple nextTuple = ((PhysicalOperator)opTable.get(inputs[0])).getNext();
+
+ if (nextTuple == null){
+ inputDrained = true;
+ evalPipeline.finishPipe();
+ }else{
+ evalPipeline.add(nextTuple);
+ lastAdded = nextTuple; // for lineage bookkeeping
+ }
+ }else{
+ Tuple output = (Tuple)buf.removeFirst();
+ if (lineageTracer != null) {
+ lineageTracer.insert(output);
+ if (lastAdded != null) lineageTracer.union(lastAdded, output); // update lineage (assumes one-to-many relationship between tuples added to pipeline and output!!)
+ }
+ return output;
+ }
+ }
+ }
+
+ public void visit(POVisitor v) {
+ v.visitEval(this);
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POLoad.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POLoad.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POLoad.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Map;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+
+
+public class POLoad extends PhysicalOperator {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ String filename;
+ LoadFunc lf;
+ boolean bound = false;
+ PigContext pigContext;
+
+ public POLoad(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> opTable,
+ PigContext pigContext,
+ FileSpec fileSpec,
+ int outputType) {
+ super(scope, id, opTable, outputType);
+ inputs = new OperatorKey[0];
+
+ filename = fileSpec.getFileName();
+ try{
+ lf = (LoadFunc) PigContext.instantiateFuncFromSpec(fileSpec.getFuncSpec());
+ }catch(Exception e){
+ throw new RuntimeException(e);
+ }
+
+ this.pigContext = pigContext;
+ }
+
+ @Override
+ public boolean open() throws IOException {
+ if (!bound){
+ lf.bindTo(filename, new BufferedPositionedInputStream(FileLocalizer.open(filename, pigContext)), 0, Long.MAX_VALUE);
+ bound = true;
+ }
+ return true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ bound = false;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ return lf.getNext();
+ }
+
+ @Override
+ public void visit(POVisitor v) {
+ v.visitLoad(this);
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSort.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSort.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSort.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.eval.EvalSpec;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+
+
+public class POSort extends PhysicalOperator {
+ static final long serialVersionUID = 1L;
+ EvalSpec sortSpec;
+ transient Iterator<Tuple> iter;
+
+
+ public POSort(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> opTable,
+ EvalSpec sortSpec,
+ int outputType) {
+ super(scope, id, opTable, outputType);
+ this.sortSpec = sortSpec;
+ this.inputs = new OperatorKey[1];
+ }
+
+ @Override
+ public boolean open() throws IOException {
+ if (!super.open())
+ return false;
+ DataBag bag = BagFactory.getInstance().newSortedBag(sortSpec);
+
+ Tuple t;
+ while((t = ((PhysicalOperator)opTable.get(inputs[0])).getNext())!=null){
+ bag.add(t);
+ }
+ iter = bag.iterator();
+ return true;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ if (iter.hasNext())
+ return iter.next();
+ else
+ return null;
+ }
+
+ @Override
+ public void visit(POVisitor v) {
+ v.visitSort(this);
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSplit.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSplit.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSplit.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POSplit.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.eval.cond.Cond;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+
+public class POSplit extends PhysicalOperator {
+ static final long serialVersionUID = 1L;
+ protected ArrayList<Cond> conditions;
+ protected int readFromCond;
+
+ public POSplit(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> opTable,
+ OperatorKey input,
+ ArrayList<Cond> conditions,
+ int readFromCond,
+ int outputType){
+ super(scope, id, opTable, outputType);
+
+ this.inputs = new OperatorKey[1];
+ this.inputs[0] = input;
+
+ this.conditions = conditions;
+ this.readFromCond = readFromCond;
+ }
+
+ @Override
+ public boolean open() throws IOException{
+ if (!super.open()){
+ return false;
+ }
+
+ return ((PhysicalOperator)opTable.get(this.inputs[0])).open();
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ Tuple nextTuple = null;
+
+ while ((nextTuple = ((PhysicalOperator)opTable.get(this.inputs[ 0 ])).getNext()) != null) {
+ boolean emitTuple = this.conditions.get(this.readFromCond).eval(nextTuple);
+
+ if (emitTuple) {
+ break;
+ }
+ }
+
+ return nextTuple;
+ }
+
+ public void visit(POVisitor v) {
+ v.visitSplit(this);
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POStore.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POStore.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POStore.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.io.PigFile;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+
+
+public class POStore extends PhysicalOperator {
+ private static final long serialVersionUID = 1L;
+
+ private final Log log = LogFactory.getLog(getClass());
+
+ private PigFile f;
+ private String funcSpec;
+ boolean append = false;
+ PigContext pigContext;
+ FileSpec outFileSpec;
+ OperatorKey logicalKey;
+ Map<OperatorKey, LocalResult> materializedResults;
+
+ public POStore(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> opTable,
+ OperatorKey logicalKey,
+ Map<OperatorKey, LocalResult> materializedResults,
+ OperatorKey input,
+ FileSpec outputFileSpec,
+ boolean append,
+ PigContext pigContext) {
+ super(scope, id, opTable, LogicalOperator.FIXED);
+ funcSpec = outputFileSpec.getFuncSpec();
+ inputs = new OperatorKey[1];
+ inputs[0] = input;
+ System.out.println("Creating " + outputFileSpec.getFileName());
+ f = new PigFile(outputFileSpec.getFileName(), append);
+ this.append = append;
+ this.pigContext = pigContext;
+ this.outFileSpec = outputFileSpec;
+ this.logicalKey = logicalKey;
+ this.materializedResults = materializedResults;
+ }
+
+ public POStore(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> opTable,
+ OperatorKey logicalKey,
+ Map<OperatorKey, LocalResult> materializedResults,
+ FileSpec outputFileSpec,
+ boolean append,
+ PigContext pigContext) {
+ super(scope, id, opTable, LogicalOperator.FIXED);
+ inputs = new OperatorKey[1];
+ inputs[0] = null;
+ funcSpec = outputFileSpec.getFuncSpec();
+ f = new PigFile(outputFileSpec.getFileName(), append);
+ this.append = append;
+ this.pigContext = pigContext;
+ this.outFileSpec = outputFileSpec;
+ this.logicalKey = logicalKey;
+ this.materializedResults = materializedResults;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ // get all tuples from input, and store them.
+ DataBag b = BagFactory.getInstance().newDefaultBag();
+ Tuple t;
+ while ((t = (Tuple) ((PhysicalOperator)opTable.get(inputs[0])).getNext()) != null) {
+ b.add(t);
+ }
+ try {
+ StoreFunc func = (StoreFunc) PigContext.instantiateFuncFromSpec(funcSpec);
+ f.store(b, func, pigContext);
+
+ // a result has materialized, track it!
+ LocalResult materializedResult = new LocalResult(this.outFileSpec);
+
+ materializedResults.put(logicalKey, materializedResult);
+ } catch(IOException e) {
+ throw e;
+ } catch(Exception e) {
+ IOException ne = new IOException(e.getClass().getName() + ": " + e.getMessage());
+ ne.setStackTrace(e.getStackTrace());
+ throw ne;
+ }
+
+ return null;
+ }
+
+ @Override
+ public int getOutputType(){
+ log.error("No one should be asking my output type");
+ RuntimeException runtimeException = new RuntimeException();
+ log.error(runtimeException);
+ throw runtimeException;
+ }
+
+ @Override
+ public void visit(POVisitor v) {
+ v.visitStore(this);
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POUnion.java?rev=631443&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POUnion.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/POUnion.java Tue Feb 26 16:51:49 2008
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.POVisitor;
+
+
+public class POUnion extends PhysicalOperator {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ int currentInput;
+
+ public POUnion(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> opTable,
+ OperatorKey[] inputsIn,
+ int outputType) {
+ super(scope, id, opTable, outputType);
+ inputs = inputsIn;
+ currentInput = 0;
+ }
+
+ public POUnion(String scope,
+ long id,
+ Map<OperatorKey, ExecPhysicalOperator> opTable,
+ int numInputs,
+ int outputType) {
+ super(scope, id, opTable, outputType);
+ inputs = new OperatorKey[numInputs];
+ for (int i = 0; i < inputs.length; i++)
+ inputs[i] = null;
+ }
+
+ @Override
+ public boolean open() throws IOException{
+ if (!super.open()){
+ return false;
+ }
+ currentInput = 0;
+ return true;
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ while (currentInput < inputs.length) {
+ Tuple t = ((PhysicalOperator)opTable.get(inputs[currentInput])).getNext();
+
+ if (t == null) {
+ currentInput++;
+ continue;
+ } else {
+ Tuple output = t;
+ if (lineageTracer != null) {
+ lineageTracer.insert(output); // update lineage (this line is needed, to generate the correct counts)
+ lineageTracer.union(t, output); // (this line amounts to a no-op)
+ }
+ return output;
+ }
+ }
+
+ return null;
+ }
+
+ public void visit(POVisitor v) {
+ v.visitUnion(this);
+ }
+
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/BinStorage.java Tue Feb 26 16:51:49 2008
@@ -47,12 +47,12 @@
public BinStorage() {
}
- public Tuple getNext() throws IOException {
+ public Tuple getNext() throws IOException {
byte b = 0;
// skip to next record
while (true) {
- if (in == null || in.getPosition() >=end) {
+ if (in == null || in.getPosition() >=end) {
return null;
}
b = (byte) in.read();
@@ -75,7 +75,7 @@
return (Tuple)DataReaderWriter.readDatum(inData);
}
- public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
+ public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
this.in = in;
inData = new DataInputStream(in);
this.end = end;
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/COUNT.java Tue Feb 26 16:51:49 2008
@@ -29,6 +29,7 @@
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.WrappedIOException;
/**
* Generates the count of the values of the first field of a tuple. This class is Algebraic in
@@ -99,7 +100,7 @@
// just too much.
sum += (Long)t.get(0);
} catch (NumberFormatException exp) {
- throw new IOException(exp.getClass().getName() + ":" + exp.getMessage());
+ throw WrappedIOException.wrap(exp.getClass().getName() + ":" + exp.getMessage(), exp);
}
}
return sum;
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/MAX.java Tue Feb 26 16:51:49 2008
@@ -28,6 +28,7 @@
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.WrappedIOException;
/**
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/MIN.java Tue Feb 26 16:51:49 2008
@@ -28,6 +28,7 @@
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.WrappedIOException;
/**
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/PigDump.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/PigDump.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/PigDump.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/PigDump.java Tue Feb 26 16:51:49 2008
@@ -26,21 +26,21 @@
public class PigDump implements StoreFunc {
- public static String recordDelimiter = "\n";
-
+ public static String recordDelimiter = "\n";
+
OutputStream os;
- public void bindTo(OutputStream os) throws IOException {
- this.os = os;
- }
+ public void bindTo(OutputStream os) throws IOException {
+ this.os = os;
+ }
- public void finish() throws IOException {
-
- }
+ public void finish() throws IOException {
+
+ }
- public void putNext(Tuple f) throws IOException {
- os.write((f.toString() + recordDelimiter).getBytes());
- }
+ public void putNext(Tuple f) throws IOException {
+ os.write((f.toString() + recordDelimiter).getBytes());
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/PigStorage.java Tue Feb 26 16:51:49 2008
@@ -17,16 +17,10 @@
*/
package org.apache.pig.builtin;
-import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
import java.io.OutputStream;
-import java.text.SimpleDateFormat;
import java.util.ArrayList;
-import java.util.Iterator;
import org.apache.pig.LoadFunc;
import org.apache.pig.StoreFunc;
Modified: incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/builtin/TextLoader.java Tue Feb 26 16:51:49 2008
@@ -20,6 +20,7 @@
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
+import java.nio.charset.Charset;
import org.apache.pig.LoadFunc;
import org.apache.pig.data.Tuple;
@@ -32,14 +33,13 @@
* contains the line of text.
*/
public class TextLoader implements LoadFunc{
- BufferedPositionedInputStream in;
- private BufferedReader inData = null;
- long end;
+ BufferedPositionedInputStream in;
+ final private static Charset utf8 = Charset.forName("UTF8");
+ long end;
private TupleFactory mTupleFactory = TupleFactory.getInstance();
- public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
+ public void bindTo(String fileName, BufferedPositionedInputStream in, long offset, long end) throws IOException {
this.in = in;
- inData = new BufferedReader(new InputStreamReader(in, "UTF8"));
this.end = end;
// Since we are not block aligned we throw away the first
// record and cound on a different instance to read it
@@ -47,11 +47,11 @@
getNext();
}
- public Tuple getNext() throws IOException {
+ public Tuple getNext() throws IOException {
if (in == null || in.getPosition() > end)
return null;
String line;
- if ((line = inData.readLine()) != null) {
+ if ((line = in.readLine(utf8, (byte)'\n')) != null) {
return mTupleFactory.newTuple(new String(line));
}
return null;
Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataBag.java Tue Feb 26 16:51:49 2008
@@ -31,7 +31,6 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.pig.impl.util.Spillable;
-import org.apache.pig.impl.mapreduceExec.PigMapReduce;
/**
* A collection of Tuples. A DataBag may or may not fit into memory.
Modified: incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultAbstractBag.java Tue Feb 26 16:51:49 2008
@@ -29,7 +29,7 @@
import java.util.ArrayList;
import org.apache.pig.impl.util.Spillable;
-import org.apache.pig.impl.mapreduceExec.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
/**
* A collection of Tuples. A DataBag may or may not fit into memory.
Modified: incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java Tue Feb 26 16:51:49 2008
@@ -17,17 +17,18 @@
*/
package org.apache.pig.data;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.io.DataOutputStream;
-import java.io.DataInputStream;
import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
-import java.io.EOFException;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.io.FileNotFoundException;
-import org.apache.pig.impl.util.PigLogger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
@@ -39,6 +40,8 @@
private static TupleFactory gTupleFactory = TupleFactory.getInstance();
+ private final Log log = LogFactory.getLog(getClass());
+
public DefaultDataBag() {
mContents = new ArrayList<Tuple>();
}
@@ -67,13 +70,22 @@
// trying to read while I'm mucking with the container.
long spilled = 0;
synchronized (mContents) {
+ DataOutputStream out = null;
+ try {
+ out = getSpillFile();
+ } catch (IOException ioe) {
+ // Do not remove last file from spilled array. It was not
+ // added as File.createTmpFile threw an IOException
+ log.error(
+ "Unable to create tmp file to spill to disk", ioe);
+ return 0;
+ }
try {
- DataOutputStream out = getSpillFile();
Iterator<Tuple> i = mContents.iterator();
while (i.hasNext()) {
i.next().write(out);
spilled++;
- // This will report progress every 16383 records.
+ // This will spill every 16383 records.
if ((spilled & 0x3fff) == 0) reportProgress();
}
out.flush();
@@ -81,9 +93,17 @@
// Remove the last file from the spilled array, since we failed to
// write to it.
mSpillFiles.remove(mSpillFiles.size() - 1);
- PigLogger.getLogger().error(
+ log.error(
"Unable to spill contents to disk", ioe);
return 0;
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ log.error("Error closing spill", e);
+ }
+ }
}
mContents.clear();
}
@@ -160,7 +180,7 @@
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should never
// happen.
- PigLogger.getLogger().fatal(
+ log.fatal(
"Unable to find our spill file", fnfe);
throw new RuntimeException(fnfe);
}
@@ -171,11 +191,11 @@
} catch (EOFException eof) {
// This should never happen, it means we
// didn't dump all of our tuples to disk.
- PigLogger.getLogger().fatal(
+ log.fatal(
"Ran out of tuples too soon.", eof);
- throw new RuntimeException("Ran out of tuples to read prematurely.");
+ throw new RuntimeException("Ran out of tuples to read prematurely.", eof);
} catch (IOException ioe) {
- PigLogger.getLogger().fatal(
+ log.fatal(
"Unable to read our spill file", ioe);
throw new RuntimeException(ioe);
}
@@ -205,7 +225,7 @@
// Fall through to the next case where we find the
// next file, or go to memory
} catch (IOException ioe) {
- PigLogger.getLogger().fatal(
+ log.fatal(
"Unable to read our spill file", ioe);
throw new RuntimeException(ioe);
}
@@ -234,7 +254,7 @@
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should never
// happen.
- PigLogger.getLogger().fatal("Unable to find our spill file",
+ log.fatal("Unable to find our spill file",
fnfe);
throw new RuntimeException(fnfe);
}
Modified: incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DistinctDataBag.java Tue Feb 26 16:51:49 2008
@@ -17,26 +17,25 @@
*/
package org.apache.pig.data;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.ListIterator;
-import java.util.TreeSet;
-import java.util.Arrays;
import java.io.BufferedInputStream;
-import java.io.DataOutputStream;
import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.TreeSet;
-import org.apache.pig.impl.eval.EvalSpec;
-import org.apache.pig.impl.util.PigLogger;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
@@ -51,6 +50,9 @@
* found to be faster than storing it in a TreeSet.
*/
public class DistinctDataBag extends DefaultAbstractBag {
+
+ private final Log log = LogFactory.getLog(getClass());
+
private static TupleFactory gTupleFactory = TupleFactory.getInstance();
public DistinctDataBag() {
@@ -104,8 +106,17 @@
// trying to read while I'm mucking with the container.
long spilled = 0;
synchronized (mContents) {
+ DataOutputStream out = null;
+ try {
+ out = getSpillFile();
+ } catch (IOException ioe) {
+ // Do not remove last file from spilled array. It was not
+ // added as File.createTmpFile threw an IOException
+ log.error(
+ "Unable to create tmp file to spill to disk", ioe);
+ return 0;
+ }
try {
- DataOutputStream out = getSpillFile();
// If we've already started reading, then it will already be
// sorted into an array list. If not, we need to sort it
// before writing.
@@ -133,9 +144,17 @@
// Remove the last file from the spilled array, since we failed to
// write to it.
mSpillFiles.remove(mSpillFiles.size() - 1);
- PigLogger.getLogger().error(
+ log.error(
"Unable to spill contents to disk", ioe);
return 0;
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ log.error("Error closing spill", e);
+ }
+ }
}
mContents.clear();
}
@@ -241,7 +260,7 @@
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should never
// happen.
- PigLogger.getLogger().fatal(
+ log.fatal(
"Unable to find our spill file", fnfe);
throw new RuntimeException(fnfe);
}
@@ -255,9 +274,9 @@
} catch (EOFException eof) {
// This should never happen, it means we
// didn't dump all of our tuples to disk.
- throw new RuntimeException("Ran out of tuples to read prematurely.");
+ throw new RuntimeException("Ran out of tuples to read prematurely.", eof);
} catch (IOException ioe) {
- PigLogger.getLogger().fatal(
+ log.fatal(
"Unable to read our spill file", ioe);
throw new RuntimeException(ioe);
}
@@ -302,7 +321,7 @@
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should
// never happen.
- PigLogger.getLogger().fatal(
+ log.fatal(
"Unable to find out spill file.", fnfe);
throw new RuntimeException(fnfe);
}
@@ -377,7 +396,7 @@
mStreams.set(fileNum, null);
return;
} catch (IOException ioe) {
- PigLogger.getLogger().fatal(
+ log.fatal(
"Unable to read our spill file", ioe);
throw new RuntimeException(ioe);
}
@@ -444,7 +463,7 @@
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should
// neer happen.
- PigLogger.getLogger().fatal(
+ log.fatal(
"Unable to find out spill file.", fnfe);
throw new RuntimeException(fnfe);
}
@@ -463,7 +482,7 @@
}
out.flush();
} catch (IOException ioe) {
- PigLogger.getLogger().fatal(
+ log.fatal(
"Unable to read our spill file", ioe);
throw new RuntimeException(ioe);
}
Modified: incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java Tue Feb 26 16:51:49 2008
@@ -26,36 +26,36 @@
*/
public class IndexedTuple extends DefaultTuple {
- public int index = -1;
-
- public IndexedTuple() {
- }
-
- public IndexedTuple(Tuple t, int indexIn) {
+ public int index = -1;
+
+ public IndexedTuple() {
+ }
+
+ public IndexedTuple(Tuple t, int indexIn) {
// Have to do it like this because Tuple is an interface, we don't
// have access to its internal structures.
super(t.getAll());
- index = indexIn;
- }
+ index = indexIn;
+ }
- @Override
- public String toString() {
- return super.toString() + "[" + index + "]";
- }
+ @Override
+ public String toString() {
+ return super.toString() + "[" + index + "]";
+ }
- // Writable methods:
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
+ // Writable methods:
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
out.writeInt(index);
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- index = in.readInt();
- }
-
- public Tuple toTuple(){
+ }
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ index = in.readInt();
+ }
+
+ public Tuple toTuple(){
return TupleFactory.getInstance().newTuple(mFields);
- }
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/SortedDataBag.java Tue Feb 26 16:51:49 2008
@@ -17,13 +17,6 @@
*/
package org.apache.pig.data;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.ListIterator;
-import java.util.PriorityQueue;
-import java.util.Iterator;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -32,16 +25,24 @@
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.ListIterator;
+import java.util.PriorityQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.impl.eval.EvalSpec;
-import org.apache.pig.impl.util.PigLogger;
/**
* An ordered collection of Tuples (possibly) with multiples. Data is
- * stored unsorted in an ArrayList as it comes in, and only sorted when it
- * is time to dump
+ * stored unsorted as it comes in, and only sorted when it is time to dump
* it to a file or when the first iterator is requested. Experementation
* found this to be the faster than storing it sorted to begin with.
*
@@ -51,6 +52,8 @@
public class SortedDataBag extends DefaultAbstractBag {
private static TupleFactory gTupleFactory = TupleFactory.getInstance();
+ private final Log log = LogFactory.getLog(getClass());
+
private Comparator<Tuple> mComp;
private boolean mReadStarted = false;
@@ -104,8 +107,17 @@
// trying to read while I'm mucking with the container.
long spilled = 0;
synchronized (mContents) {
+ DataOutputStream out = null;
+ try {
+ out = getSpillFile();
+ } catch (IOException ioe) {
+ // Do not remove last file from spilled array. It was not
+ // added as File.createTmpFile threw an IOException
+ log.error(
+ "Unable to create tmp file to spill to disk", ioe);
+ return 0;
+ }
try {
- DataOutputStream out = getSpillFile();
// Have to sort the data before we can dump it. It's bogus
// that we have to do this under the lock, but there's no way
// around it. If the reads alread started, then we've
@@ -128,9 +140,17 @@
// Remove the last file from the spilled array, since we failed to
// write to it.
mSpillFiles.remove(mSpillFiles.size() - 1);
- PigLogger.getLogger().error(
+ log.error(
"Unable to spill contents to disk", ioe);
return 0;
+ } finally {
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ log.error("Error closing spill", e);
+ }
+ }
}
mContents.clear();
}
@@ -239,7 +259,7 @@
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should never
// happen.
- PigLogger.getLogger().fatal(
+ log.fatal(
"Unable to find our spill file", fnfe);
throw new RuntimeException(fnfe);
}
@@ -253,11 +273,11 @@
} catch (EOFException eof) {
// This should never happen, it means we
// didn't dump all of our tuples to disk.
- PigLogger.getLogger().fatal(
+ log.fatal(
"Ran out of tuples too soon.", eof);
- throw new RuntimeException("Ran out of tuples to read prematurely.");
+ throw new RuntimeException("Ran out of tuples to read prematurely.", eof);
} catch (IOException ioe) {
- PigLogger.getLogger().fatal(
+ log.fatal(
"Unable to read our spill file", ioe);
throw new RuntimeException(ioe);
}
@@ -304,7 +324,7 @@
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should
// never happen.
- PigLogger.getLogger().fatal(
+ log.fatal(
"Unable to find our spill file", fnfe);
throw new RuntimeException(fnfe);
}
@@ -363,7 +383,7 @@
// this file.
mStreams.set(fileNum, null);
} catch (IOException ioe) {
- PigLogger.getLogger().fatal(
+ log.fatal(
"Unable to read our spill file", ioe);
throw new RuntimeException(ioe);
}
@@ -427,7 +447,7 @@
} catch (FileNotFoundException fnfe) {
// We can't find our own spill file? That should
// neer happen.
- PigLogger.getLogger().fatal(
+ log.fatal(
"Unable to find our spill file", fnfe);
throw new RuntimeException(fnfe);
}
@@ -446,7 +466,7 @@
}
out.flush();
} catch (IOException ioe) {
- PigLogger.getLogger().fatal(
+ log.fatal(
"Unable to read our spill file", ioe);
throw new RuntimeException(ioe);
}
Modified: incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/TimestampedTuple.java Tue Feb 26 16:51:49 2008
@@ -21,48 +21,50 @@
import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
public class TimestampedTuple extends DefaultTuple {
+ private final Log log = LogFactory.getLog(getClass());
+ static String defaultDelimiter = "[,\t]";
+
protected double timestamp = 0; // timestamp of this tuple
protected boolean heartbeat = false; // true iff this is a heartbeat (i.e. purpose is just to convey new timestamp; carries no data)
public double getTimeStamp() {
- return timestamp;
+ return timestamp;
}
public void setTimeStamp(double t) {
- this.timestamp = t;
+ this.timestamp = t;
}
public boolean isHeartBeat() {
- return heartbeat;
+ return heartbeat;
}
public void setHeartBeat(boolean h) {
- this.heartbeat = h;
+ this.heartbeat = h;
}
public TimestampedTuple(int numFields) {
super(numFields);
}
- /*
public TimestampedTuple(String textLine, String delimiter, int timestampColumn,
- SimpleDateFormat dateFormat){
+ SimpleDateFormat dateFormat){
if (delimiter == null) {
delimiter = defaultDelimiter;
}
String[] splitString = textLine.split(delimiter, -1);
- fields = new ArrayList<Datum>(splitString.length-1);
+ mFields = new ArrayList<Object>(splitString.length-1);
for (int i = 0; i < splitString.length; i++) {
- if (i==timestampColumn){
- try{
- timestamp = dateFormat.parse(splitString[i]).getTime()/1000.0;
- }catch(ParseException e){
- System.err.println("Could not parse timestamp " + splitString[i]);
- }
- }else{
- fields.add(new DataAtom(splitString[i]));
- }
+ if (i==timestampColumn){
+ try{
+ timestamp = dateFormat.parse(splitString[i]).getTime()/1000.0;
+ }catch(ParseException e){
+ log.error("Could not parse timestamp " + splitString[i]);
+ }
+ }else{
+ mFields.add(new String(splitString[i]));
+ }
}
}
- */
-
-
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/FunctionInstantiator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/FunctionInstantiator.java?rev=631443&r1=631442&r2=631443&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/FunctionInstantiator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/FunctionInstantiator.java Tue Feb 26 16:51:49 2008
@@ -21,5 +21,5 @@
public interface FunctionInstantiator {
- public Object instantiateFuncFromAlias(String alias) throws IOException;
+ public Object instantiateFuncFromAlias(String alias) throws IOException;
}