You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/04 23:41:22 UTC
svn commit: r692261 - in /incubator/pig/branches/types/src/org/apache/pig:
backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java
impl/logicalLayer/LOStream.java
Author: olga
Date: Thu Sep 4 14:41:21 2008
New Revision: 692261
URL: http://svn.apache.org/viewvc?rev=692261&view=rev
Log:
missing files from stream merge
Added:
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java
Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java?rev=692261&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/MRStreamHandler.java Thu Sep 4 14:41:21 2008
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
+
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This visitor visits the MRPlan and does the following
+ * for each MROper
+ * - If the map plan or the reduce plan of the MROper has
+ * a POStream in it, this marks in the MROper whether the map
+ * has a POStream or if the reduce has a POStream.
+ *
+ */
+public class MRStreamHandler extends MROpPlanVisitor {
+
+ /**
+ * @param plan MR plan to visit
+ */
+ public MRStreamHandler(MROperPlan plan) {
+ super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+ }
+
+ @Override
+ public void visitMROp(MapReduceOper mr) throws VisitorException {
+
+ StreamChecker checker = new StreamChecker(mr.mapPlan);
+ checker.visit();
+ if(checker.isStreamPresent()) {
+ mr.setStreamInMap(true);
+ }
+
+ checker = new StreamChecker(mr.reducePlan);
+ checker.visit();
+ if(checker.isStreamPresent()) {
+ mr.setStreamInReduce(true);
+ }
+
+ }
+
+ class StreamChecker extends PhyPlanVisitor {
+
+ private boolean streamPresent = false;
+ public StreamChecker(PhysicalPlan plan) {
+ super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
+ */
+ @Override
+ public void visitStream(POStream stream) throws VisitorException {
+ // stream present
+ streamPresent = true;
+ }
+
+ /**
+ * @return if stream is present
+ */
+ public boolean isStreamPresent() {
+ return streamPresent;
+ }
+ }
+}
+
Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java?rev=692261&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOStream.java Thu Sep 4 14:41:21 2008
@@ -0,0 +1,114 @@
+/**
+ *
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+
+/**
+ * {@link LOStream} represents the specification of an external
+ * command to be executed in a Pig Query.
+ *
+ * <code>LOStream</code> encapsulates all relevant details of the
+ * command specified by the user either directly via the <code>STREAM</code>
+ * operator or indirectly via a <code>DEFINE</code> operator. It includes
+ * details such as input/output/error specifications and also files to be
+ * shipped to the cluster and files to be cached.
+ */
+public class LOStream extends LogicalOperator {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 2L;
+ // the StreamingCommand object for the
+ // Stream Operator this operator represents
+ private StreamingCommand StrCmd;
+ //private LogicalOperator input;
+ private ExecutableManager executableManager;
+ /**
+ * Create a new <code>LOStream</code> with the given command.
+ *
+ * @param plan the logical plan this operator is a part of
+ * @param k the operator key for this operator
+ * @param pigContext the pigContext object
+ * @param argv parsed arguments of the <code>command</code>
+ */
+ public LOStream(LogicalPlan plan, OperatorKey k, LogicalOperator input, ExecutableManager exeManager, StreamingCommand cmd) {
+ super(plan, k);
+ //this.input = input;
+ this.StrCmd = cmd;
+ this.executableManager = exeManager;
+ }
+
+ /**
+ * Get the StreamingCommand object associated
+ * with this operator
+ *
+ * @return the StreamingCommand object
+ */
+ public StreamingCommand getStreamingCommand() {
+ return StrCmd;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.logicalLayer.LogicalOperator#getSchema()
+ */
+ @Override
+ public Schema getSchema() throws FrontendException {
+ return mSchema;
+ /*
+ if (!mIsSchemaComputed) {
+ /*
+ LogicalOperator input = mPlan.getPredecessors(this).get(0);
+ ArrayList<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
+ try {
+ mSchema = input.getSchema();
+ mIsSchemaComputed = true;
+ } catch (FrontendException ioe) {
+ mSchema = null;
+ mIsSchemaComputed = false;
+ throw ioe;
+ }
+ }
+ return mSchema;
+ */
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.logicalLayer.LogicalOperator#visit(org.apache.pig.impl.logicalLayer.LOVisitor)
+ */
+ @Override
+ public void visit(LOVisitor v) throws VisitorException {
+ v.visit(this);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.plan.Operator#name()
+ */
+ @Override
+ public String name() {
+ return "Stream (" + StrCmd.toString() + ") " + mKey.scope + "-" + mKey.id;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.plan.Operator#supportsMultipleInputs()
+ */
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ /**
+ * @return the ExecutableManager
+ */
+ public ExecutableManager getExecutableManager() {
+ return executableManager;
+ }
+
+}