You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2011/07/19 03:01:59 UTC
svn commit: r1148117 [1/3] - in /pig/trunk: ./ shims/ shims/src/
shims/src/hadoop20/ shims/src/hadoop20/org/ shims/src/hadoop20/org/apache/
shims/src/hadoop20/org/apache/pig/ shims/src/hadoop20/org/apache/pig/backend/
shims/src/hadoop20/org/apache/pig/...
Author: daijy
Date: Tue Jul 19 01:01:53 2011
New Revision: 1148117
URL: http://svn.apache.org/viewvc?rev=1148117&view=rev
Log:
Make Pig work with hadoop .NEXT
Added:
pig/trunk/shims/
pig/trunk/shims/src/
pig/trunk/shims/src/hadoop20/
pig/trunk/shims/src/hadoop20/org/
pig/trunk/shims/src/hadoop20/org/apache/
pig/trunk/shims/src/hadoop20/org/apache/pig/
pig/trunk/shims/src/hadoop20/org/apache/pig/backend/
pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/
pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/
pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/
pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/trunk/shims/src/hadoop23/
pig/trunk/shims/src/hadoop23/org/
pig/trunk/shims/src/hadoop23/org/apache/
pig/trunk/shims/src/hadoop23/org/apache/pig/
pig/trunk/shims/src/hadoop23/org/apache/pig/backend/
pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/
pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/
pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/
pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/trunk/shims/test/
pig/trunk/shims/test/hadoop20/
pig/trunk/shims/test/hadoop20/org/
pig/trunk/shims/test/hadoop20/org/apache/
pig/trunk/shims/test/hadoop20/org/apache/pig/
pig/trunk/shims/test/hadoop20/org/apache/pig/test/
pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java
pig/trunk/shims/test/hadoop23/
pig/trunk/shims/test/hadoop23/org/
pig/trunk/shims/test/hadoop23/org/apache/
pig/trunk/shims/test/hadoop23/org/apache/pig/
pig/trunk/shims/test/hadoop23/org/apache/pig/test/
pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/build.xml
pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputFormat.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/WeightedRangePartitioner.java
pig/trunk/src/org/apache/pig/impl/PigContext.java
pig/trunk/src/org/apache/pig/impl/io/PigFile.java
pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java
pig/trunk/src/overview.html
pig/trunk/test/findbugsExcludeFile.xml
pig/trunk/test/org/apache/pig/test/MiniCluster.java
pig/trunk/test/org/apache/pig/test/TestGrunt.java
pig/trunk/test/org/apache/pig/test/TestTypedMap.java
pig/trunk/test/org/apache/pig/test/TestUDFContext.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jul 19 01:01:53 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2125: Make Pig work with hadoop .NEXT (daijy)
+
PIG-2143: Make PigStorage optionally store schema; improve docs. (dvryaboy)
PIG-1973: UDFContext.getUDFContext usage of ThreadLocal pattern
Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Tue Jul 19 01:01:53 2011
@@ -20,6 +20,8 @@
xmlns:ivy="antlib:org.apache.ivy.ant">
<!-- Load all the default properties, and any the user wants -->
<!-- to contribute (without having to type -D or edit this file -->
+ <taskdef resource="net/sf/antcontrib/antcontrib.properties"/>
+
<property file="${user.home}/build.properties" />
<property file="${basedir}/build.properties" />
@@ -145,6 +147,16 @@
<property name="ivy.repo.dir" value="${user.home}/ivyrepo" />
<property name="ivy.dir" location="ivy" />
<loadproperties srcfile="${ivy.dir}/libraries.properties"/>
+
+ <!--propertyregex property="hadoopversion"
+ input="${hadoop-core.version}"
+ regexp="\d+\.(\d+)\.\d+"
+ select="\1" /-->
+ <property name="hadoopversion" value="20" />
+
+ <property name="src.shims.dir" value="${basedir}/shims/src/hadoop${hadoopversion}" />
+ <property name="src.shims.test.dir" value="${basedir}/shims/test/hadoop${hadoopversion}" />
+
<property name="hadoop.jar" value="hadoop-core-${hadoop-core.version}.jar" />
<property name="asfrepo" value="https://repository.apache.org"/>
<property name="asfsnapshotrepo" value="${asfrepo}/content/repositories/snapshots"/>
@@ -194,7 +206,7 @@
<!-- setup the classpath -->
<path id="classpath">
<path refid="compile.classpath"/>
- <fileset file="${lib.dir}/${automaton.jarfile}" />
+ <fileset dir="${lib.dir}" includes="*.jar"/>
<fileset file="${ivy.lib.dir}/${zookeeper.jarfile}"/>
</path>
@@ -349,13 +361,13 @@
<echo>*** Else, compile-sources (which only warns about deprecations) target will be executed ***</echo>
<antcall target="compile-sources" inheritRefs="true" inheritall="true">
- <param name="sources" value="${src.dir};${src.gen.dir};${src.lib.dir}/shock;${src.lib.dir}/bzip2" />
+ <param name="sources" value="${src.dir};${src.gen.dir};${src.lib.dir}/shock;${src.lib.dir}/bzip2;${src.shims.dir}" />
<param name="dist" value="${build.classes}" />
<param name="cp" value="classpath" />
</antcall>
<antcall target="compile-sources-all-warnings" inheritRefs="true" inheritall="true">
- <param name="sources" value="${src.dir};${src.gen.dir};${src.lib.dir}/shock;${src.lib.dir}/bzip2" />
+ <param name="sources" value="${src.dir};${src.gen.dir};${src.lib.dir}/shock;${src.lib.dir}/bzip2;${src.shims.dir}" />
<param name="dist" value="${build.classes}" />
<param name="cp" value="classpath" />
</antcall>
@@ -369,7 +381,7 @@
<echo>*** Else, compile-sources (which only warns about deprecations) target will be executed ***</echo>
<antcall target="compile-sources" inheritRefs="true" inheritall="true">
- <param name="sources" value="${test.src.dir}" />
+ <param name="sources" value="${test.src.dir};${src.shims.test.dir}" />
<param name="dist" value="${test.build.classes}" />
<param name="cp" value="test.classpath" />
<!-- don't compile PigTestLoader -->
@@ -377,7 +389,7 @@
</antcall>
<antcall target="compile-sources-all-warnings" inheritRefs="true" inheritall="true">
- <param name="sources" value="${test.src.dir}" />
+ <param name="sources" value="${test.src.dir};${src.shims.test.dir}" />
<param name="dist" value="${test.build.classes}" />
<param name="cp" value="test.classpath" />
<!-- don't compile PigTestLoader -->
@@ -667,7 +679,7 @@
<mkdir dir="${test.log.dir}"/>
<tempfile property="junit.tmp.dir" prefix="pig_junit_tmp" destDir="${java.io.tmpdir}" />
<mkdir dir="${junit.tmp.dir}/"/>
- <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no" fork="yes" maxmemory="512m" dir="${basedir}" timeout="${test.timeout}" errorProperty="tests.failed" failureProperty="tests.failed">
+ <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no" fork="yes" maxmemory="1024m" dir="${basedir}" timeout="${test.timeout}" errorProperty="tests.failed" failureProperty="tests.failed">
<sysproperty key="ssh.gateway" value="${ssh.gateway}" />
<sysproperty key="hod.server" value="${hod.server}" />
<sysproperty key="java.io.tmpdir" value="${junit.tmp.dir}" />
Added: pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (added)
+++ pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+
+abstract public class PigMapBase extends PigGenericMapBase {
+ /**
+ *
+ * Get mapper's illustrator context
+ *
+ * @param conf Configuration
+ * @param input Input bag to serve as data source
+ * @param output Map output buffer
+ * @param split the split
+ * @return Illustrator's context
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public Context getIllustratorContext(Configuration conf, DataBag input,
+ List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
+ throws IOException, InterruptedException {
+ return new IllustratorContext(conf, input, output, split);
+ }
+
+ public class IllustratorContext extends Context {
+ private DataBag input;
+ List<Pair<PigNullableWritable, Writable>> output;
+ private Iterator<Tuple> it = null;
+ private Tuple value = null;
+ private boolean init = false;
+
+ public IllustratorContext(Configuration conf, DataBag input,
+ List<Pair<PigNullableWritable, Writable>> output,
+ InputSplit split) throws IOException, InterruptedException {
+ super(conf, new TaskAttemptID(), null, null, null, null, split);
+ if (output == null)
+ throw new IOException("Null output can not be used");
+ this.input = input; this.output = output;
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (input == null) {
+ if (!init) {
+ init = true;
+ return true;
+ }
+ return false;
+ }
+ if (it == null)
+ it = input.iterator();
+ if (!it.hasNext())
+ return false;
+ value = it.next();
+ return true;
+ }
+
+ @Override
+ public Text getCurrentKey() {
+ return null;
+ }
+
+ @Override
+ public Tuple getCurrentValue() {
+ return value;
+ }
+
+ @Override
+ public void write(PigNullableWritable key, Writable value)
+ throws IOException, InterruptedException {
+ output.add(new Pair<PigNullableWritable, Writable>(key, value));
+ }
+
+ @Override
+ public void progress() {
+
+ }
+ }
+}
Added: pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (added)
+++ pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.FakeRawKeyValueIterator;
+
+public class PigMapReduce extends PigGenericMapReduce {
+ public static class Reduce extends PigGenericMapReduce.Reduce {
+ /**
+ * Get reducer's illustrator context
+ *
+ * @param input Input buffer as output by maps
+ * @param pkg package
+ * @return reducer's illustrator context
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public Context getIllustratorContext(Job job,
+ List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException {
+ return new IllustratorContext(job, input, pkg);
+ }
+
+ @SuppressWarnings("unchecked")
+ public class IllustratorContext extends Context {
+ private PigNullableWritable currentKey = null, nextKey = null;
+ private NullableTuple nextValue = null;
+ private List<NullableTuple> currentValues = null;
+ private Iterator<Pair<PigNullableWritable, Writable>> it;
+ private final ByteArrayOutputStream bos;
+ private final DataOutputStream dos;
+ private final RawComparator sortComparator, groupingComparator;
+ POPackage pack = null;
+
+ public IllustratorContext(Job job,
+ List<Pair<PigNullableWritable, Writable>> input,
+ POPackage pkg
+ ) throws IOException, InterruptedException {
+ super(job.getJobConf(), new TaskAttemptID(), new FakeRawKeyValueIterator(input.iterator().hasNext()),
+ null, null, null, null, null, null, PigNullableWritable.class, NullableTuple.class);
+ bos = new ByteArrayOutputStream();
+ dos = new DataOutputStream(bos);
+ org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf());
+ sortComparator = nwJob.getSortComparator();
+ groupingComparator = nwJob.getGroupingComparator();
+
+ Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() {
+ @Override
+ public int compare(Pair<PigNullableWritable, Writable> o1,
+ Pair<PigNullableWritable, Writable> o2) {
+ try {
+ o1.first.write(dos);
+ int l1 = bos.size();
+ o2.first.write(dos);
+ int l2 = bos.size();
+ byte[] bytes = bos.toByteArray();
+ bos.reset();
+ return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1);
+ } catch (IOException e) {
+ throw new RuntimeException("Serialization exception in sort:"+e.getMessage());
+ }
+ }
+ }
+ );
+ currentValues = new ArrayList<NullableTuple>();
+ it = input.iterator();
+ if (it.hasNext()) {
+ Pair<PigNullableWritable, Writable> entry = it.next();
+ nextKey = entry.first;
+ nextValue = (NullableTuple) entry.second;
+ }
+ pack = pkg;
+ }
+
+ @Override
+ public PigNullableWritable getCurrentKey() {
+ return currentKey;
+ }
+
+ @Override
+ public boolean nextKey() {
+ if (nextKey == null)
+ return false;
+ currentKey = nextKey;
+ currentValues.clear();
+ currentValues.add(nextValue);
+ nextKey = null;
+ for(; it.hasNext(); ) {
+ Pair<PigNullableWritable, Writable> entry = it.next();
+ /* Why can't raw comparison be used?
+ byte[] bytes;
+ int l1, l2;
+ try {
+ currentKey.write(dos);
+ l1 = bos.size();
+ entry.first.write(dos);
+ l2 = bos.size();
+ bytes = bos.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException("nextKey exception : "+e.getMessage());
+ }
+ bos.reset();
+ if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0)
+ */
+ if (groupingComparator.compare(currentKey, entry.first) == 0)
+ {
+ currentValues.add((NullableTuple)entry.second);
+ } else {
+ nextKey = entry.first;
+ nextValue = (NullableTuple) entry.second;
+ break;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public Iterable<NullableTuple> getValues() {
+ return currentValues;
+ }
+
+ @Override
+ public void write(PigNullableWritable k, Writable t) {
+ }
+
+ @Override
+ public void progress() {
+ }
+ }
+ }
+}
Added: pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (added)
+++ pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.shims;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * We need to make Pig work with both hadoop 20 and hadoop 23 (PIG-2125). However,
+ * there is API differences between hadoop 20 and 23. Here we use a shims layer to
+ * hide these API differences. A dynamic shims layer is not possible due to some
+ * static dependencies. We adopt a static shimes approach. For different hadoop version,
+ * we need to recompile.
+ *
+ * This class wrapping all static method. PigMapReduce, PigMapBase, MiniCluster wrapping hadoop
+ * version dependant implementaton of PigGenericMapReduce, PigGenericMapBase and MiniGenericCluster.
+ **/
+public class HadoopShims {
+ static public JobContext cloneJobContext(JobContext original) throws IOException, InterruptedException {
+ JobContext newContext = new JobContext(original.getConfiguration(), original.getJobID());
+ return newContext;
+ }
+
+ static public TaskAttemptContext createTaskAttemptContext(Configuration conf,
+ TaskAttemptID taskId) {
+ TaskAttemptContext newContext = new TaskAttemptContext(conf,
+ taskId);
+ return newContext;
+ }
+
+ static public JobContext createJobContext(Configuration conf,
+ JobID jobId) {
+ JobContext newJobContext = new JobContext(
+ conf, jobId);
+ return newJobContext;
+ }
+
+ static public boolean isMap(TaskAttemptID taskAttemptID) {
+ return taskAttemptID.isMap();
+ }
+}
Added: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (added)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.Credentials;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapBase;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+
+abstract public class PigMapBase extends PigGenericMapBase {
+ /**
+ *
+ * Get mapper's illustrator context
+ *
+ * @param conf Configuration
+ * @param input Input bag to serve as data source
+ * @param output Map output buffer
+ * @param split the split
+ * @return Illustrator's context
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public Context getIllustratorContext(Configuration conf, DataBag input,
+ List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
+ throws IOException, InterruptedException {
+ return new IllustratorContext(conf, input, output, split);
+ }
+
+ public class IllustratorContext extends Context {
+ private DataBag input;
+ List<Pair<PigNullableWritable, Writable>> output;
+ private Iterator<Tuple> it = null;
+ private Tuple value = null;
+ private boolean init = false;
+
+ public IllustratorContext(Configuration conf, DataBag input,
+ List<Pair<PigNullableWritable, Writable>> output,
+ InputSplit split) throws IOException, InterruptedException {
+ if (output == null)
+ throw new IOException("Null output can not be used");
+ this.input = input; this.output = output;
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (input == null) {
+ if (!init) {
+ init = true;
+ return true;
+ }
+ return false;
+ }
+ if (it == null)
+ it = input.iterator();
+ if (!it.hasNext())
+ return false;
+ value = it.next();
+ return true;
+ }
+
+ @Override
+ public Text getCurrentKey() {
+ return null;
+ }
+
+ @Override
+ public Tuple getCurrentValue() {
+ return value;
+ }
+
+ @Override
+ public void write(PigNullableWritable key, Writable value)
+ throws IOException, InterruptedException {
+ output.add(new Pair<PigNullableWritable, Writable>(key, value));
+ }
+
+ @Override
+ public void progress() {
+
+ }
+
+ @Override
+ public InputSplit getInputSplit() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> arg0) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(String arg0, String arg1) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getStatus() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public TaskAttemptID getTaskAttemptID() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setStatus(String arg0) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Path[] getArchiveClassPaths() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public long[] getArchiveTimestamps() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public URI[] getCacheArchives() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public URI[] getCacheFiles() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
+ throws ClassNotFoundException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Credentials getCredentials() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Path[] getFileClassPaths() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public long[] getFileTimestamps() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public RawComparator<?> getGroupingComparator() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<? extends InputFormat<?, ?>> getInputFormatClass()
+ throws ClassNotFoundException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getJar() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public JobID getJobID() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getJobName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean getJobSetupCleanupNeeded() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public Path[] getLocalCacheArchives() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Path[] getLocalCacheFiles() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<?> getMapOutputKeyClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<?> getMapOutputValueClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
+ throws ClassNotFoundException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int getMaxMapAttempts() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public int getMaxReduceAttempts() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public int getNumReduceTasks() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
+ throws ClassNotFoundException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<?> getOutputKeyClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<?> getOutputValueClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<? extends Partitioner<?, ?>> getPartitionerClass()
+ throws ClassNotFoundException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean getProfileEnabled() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public String getProfileParams() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public IntegerRanges getProfileTaskRange(boolean arg0) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
+ throws ClassNotFoundException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public RawComparator<?> getSortComparator() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean getSymlink() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public String getUser() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Path getWorkingDirectory() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ }
+}
Added: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (added)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configuration.IntegerRanges;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import org.apache.hadoop.security.Credentials;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.FakeRawKeyValueIterator;
+
+public class PigMapReduce extends PigGenericMapReduce {
+ public static class Reduce extends PigGenericMapReduce.Reduce {
+ /**
+ * Get reducer's illustrator context
+ *
+ * @param input Input buffer as output by maps
+ * @param pkg package
+ * @return reducer's illustrator context
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Override
+ public Context getIllustratorContext(Job job,
+ List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException {
+ return new IllustratorContext(job, input, pkg);
+ }
+
+ @SuppressWarnings("unchecked")
+ public class IllustratorContext extends Context {
+ private PigNullableWritable currentKey = null, nextKey = null;
+ private NullableTuple nextValue = null;
+ private List<NullableTuple> currentValues = null;
+ private Iterator<Pair<PigNullableWritable, Writable>> it;
+ private final ByteArrayOutputStream bos;
+ private final DataOutputStream dos;
+ private final RawComparator sortComparator, groupingComparator;
+ POPackage pack = null;
+
+ public IllustratorContext(Job job,
+ List<Pair<PigNullableWritable, Writable>> input,
+ POPackage pkg
+ ) throws IOException, InterruptedException {
+ bos = new ByteArrayOutputStream();
+ dos = new DataOutputStream(bos);
+ org.apache.hadoop.mapreduce.Job nwJob = new org.apache.hadoop.mapreduce.Job(job.getJobConf());
+ sortComparator = nwJob.getSortComparator();
+ groupingComparator = nwJob.getGroupingComparator();
+
+ Collections.sort(input, new Comparator<Pair<PigNullableWritable, Writable>>() {
+ @Override
+ public int compare(Pair<PigNullableWritable, Writable> o1,
+ Pair<PigNullableWritable, Writable> o2) {
+ try {
+ o1.first.write(dos);
+ int l1 = bos.size();
+ o2.first.write(dos);
+ int l2 = bos.size();
+ byte[] bytes = bos.toByteArray();
+ bos.reset();
+ return sortComparator.compare(bytes, 0, l1, bytes, l1, l2-l1);
+ } catch (IOException e) {
+ throw new RuntimeException("Serialization exception in sort:"+e.getMessage());
+ }
+ }
+ }
+ );
+ currentValues = new ArrayList<NullableTuple>();
+ it = input.iterator();
+ if (it.hasNext()) {
+ Pair<PigNullableWritable, Writable> entry = it.next();
+ nextKey = entry.first;
+ nextValue = (NullableTuple) entry.second;
+ }
+ pack = pkg;
+ }
+
+ @Override
+ public PigNullableWritable getCurrentKey() {
+ return currentKey;
+ }
+
+ @Override
+ public boolean nextKey() {
+ if (nextKey == null)
+ return false;
+ currentKey = nextKey;
+ currentValues.clear();
+ currentValues.add(nextValue);
+ nextKey = null;
+ for(; it.hasNext(); ) {
+ Pair<PigNullableWritable, Writable> entry = it.next();
+ /* Why can't raw comparison be used?
+ byte[] bytes;
+ int l1, l2;
+ try {
+ currentKey.write(dos);
+ l1 = bos.size();
+ entry.first.write(dos);
+ l2 = bos.size();
+ bytes = bos.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException("nextKey exception : "+e.getMessage());
+ }
+ bos.reset();
+ if (groupingComparator.compare(bytes, 0, l1, bytes, l1, l2-l1) == 0)
+ */
+ if (groupingComparator.compare(currentKey, entry.first) == 0)
+ {
+ currentValues.add((NullableTuple)entry.second);
+ } else {
+ nextKey = entry.first;
+ nextValue = (NullableTuple) entry.second;
+ break;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public Iterable<NullableTuple> getValues() {
+ return currentValues;
+ }
+
+ @Override
+ public void write(PigNullableWritable k, Writable t) {
+ }
+
+ @Override
+ public void progress() {
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> arg0) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Counter getCounter(String arg0, String arg1) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public NullableTuple getCurrentValue() throws IOException,
+ InterruptedException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException,
+ InterruptedException {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public String getStatus() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public TaskAttemptID getTaskAttemptID() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void setStatus(String arg0) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public Path[] getArchiveClassPaths() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public long[] getArchiveTimestamps() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public URI[] getCacheArchives() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public URI[] getCacheFiles() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<? extends Reducer<?, ?, ?, ?>> getCombinerClass()
+ throws ClassNotFoundException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Credentials getCredentials() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Path[] getFileClassPaths() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public long[] getFileTimestamps() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public RawComparator<?> getGroupingComparator() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<? extends InputFormat<?, ?>> getInputFormatClass()
+ throws ClassNotFoundException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getJar() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public JobID getJobID() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getJobName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean getJobSetupCleanupNeeded() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public Path[] getLocalCacheArchives() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Path[] getLocalCacheFiles() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<?> getMapOutputKeyClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<?> getMapOutputValueClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<? extends Mapper<?, ?, ?, ?>> getMapperClass()
+ throws ClassNotFoundException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public int getMaxMapAttempts() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public int getMaxReduceAttempts() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public int getNumReduceTasks() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Override
+ public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
+ throws ClassNotFoundException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<?> getOutputKeyClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<?> getOutputValueClass() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<? extends Partitioner<?, ?>> getPartitionerClass()
+ throws ClassNotFoundException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean getProfileEnabled() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public String getProfileParams() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public IntegerRanges getProfileTaskRange(boolean arg0) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Class<? extends Reducer<?, ?, ?, ?>> getReducerClass()
+ throws ClassNotFoundException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public RawComparator<?> getSortComparator() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean getSymlink() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public String getUser() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Path getWorkingDirectory() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ }
+ }
+}
Added: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (added)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.shims;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.ContextFactory;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+public class HadoopShims {
+ static public JobContext cloneJobContext(JobContext original) throws IOException, InterruptedException {
+ JobContext newContext = ContextFactory.cloneContext(original, original.getConfiguration());
+ return newContext;
+ }
+
+ static public TaskAttemptContext createTaskAttemptContext(Configuration conf,
+ TaskAttemptID taskId) {
+ TaskAttemptContext newContext = new TaskAttemptContextImpl(conf, taskId);
+ return newContext;
+ }
+
+ static public JobContext createJobContext(Configuration conf,
+ JobID jobId) {
+ JobContext newContext = new JobContextImpl(conf, jobId);
+ return newContext;
+ }
+
+ static public boolean isMap(TaskAttemptID taskAttemptID) {
+ TaskType type = taskAttemptID.getTaskType();
+ if (type==TaskType.MAP)
+ return true;
+
+ return false;
+ }
+}
Added: pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java (added)
+++ pig/trunk/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+
+public class MiniCluster extends MiniGenericCluster {
+ private MiniMRCluster m_mr = null;
+ public MiniCluster() {
+ super();
+ }
+
+ @Override
+ protected void setupMiniDfsAndMrClusters() {
+ try {
+ final int dataNodes = 4; // There will be 4 data nodes
+ final int taskTrackers = 4; // There will be 4 task tracker nodes
+
+ // Create the configuration hadoop-site.xml file
+ File conf_dir = new File(System.getProperty("user.home"), "pigtest/conf/");
+ conf_dir.mkdirs();
+ File conf_file = new File(conf_dir, "hadoop-site.xml");
+
+ conf_file.delete();
+
+ // Builds and starts the mini dfs and mapreduce clusters
+ Configuration config = new Configuration();
+ m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
+ m_fileSys = m_dfs.getFileSystem();
+ m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getUri().toString(), 1);
+
+ // Write the necessary config info to hadoop-site.xml
+ m_conf = m_mr.createJobConf();
+ m_conf.setInt("mapred.submit.replication", 2);
+ m_conf.set("dfs.datanode.address", "0.0.0.0:0");
+ m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
+ m_conf.set("mapred.map.max.attempts", "2");
+ m_conf.set("mapred.reduce.max.attempts", "2");
+ m_conf.writeXml(new FileOutputStream(conf_file));
+
+ // Set the system properties needed by Pig
+ System.setProperty("cluster", m_conf.get("mapred.job.tracker"));
+ System.setProperty("namenode", m_conf.get("fs.default.name"));
+ System.setProperty("junit.hadoop.conf", conf_dir.getPath());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void shutdownMiniMrClusters() {
+ if (m_mr != null) { m_mr.shutdown(); }
+ m_mr = null;
+ }
+}
Added: pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java (added)
+++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.mapreduce.v2.TestMRJobs;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+
+/**
+ * This class builds a single instance of itself with the Singleton
+ * design pattern. While building the single instance, it sets up a
+ * mini cluster that actually consists of a mini DFS cluster and a
+ * mini MapReduce cluster on the local machine and also sets up the
+ * environment for Pig to run on top of the mini cluster.
+ */
+public class MiniCluster extends MiniGenericCluster {
+ protected MiniMRYarnCluster m_mr = null;
+ private Configuration m_dfs_conf = null;
+ private Configuration m_mr_conf = null;
+
+ public MiniCluster() {
+ super();
+ }
+
+ @Override
+ protected void setupMiniDfsAndMrClusters() {
+ Logger.getLogger("org.apache.hadoop").setLevel(Level.INFO);
+ try {
+ final int dataNodes = 4; // There will be 4 data nodes
+ final int taskTrackers = 4; // There will be 4 task tracker nodes
+
+ Logger.getRootLogger().setLevel(Level.TRACE);
+ // Create the configuration hadoop-site.xml file
+ File conf_dir = new File(System.getProperty("user.home"), "pigtest/conf/");
+ conf_dir.mkdirs();
+ File conf_file = new File(conf_dir, "hadoop-site.xml");
+
+ conf_file.delete();
+
+ // Builds and starts the mini dfs and mapreduce clusters
+ Configuration config = new Configuration();
+ m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
+ m_fileSys = m_dfs.getFileSystem();
+ m_dfs_conf = m_dfs.getConfiguration(0);
+
+ m_mr = new MiniMRYarnCluster("PigMiniCluster");
+ m_mr.init(new Configuration());
+ //m_mr.init(m_dfs_conf);
+ m_mr.start();
+
+ // Write the necessary config info to hadoop-site.xml
+ //m_mr_conf = m_mr.getConfig();
+ m_mr_conf = new Configuration(m_mr.getConfig());
+
+ m_conf = m_mr_conf;
+ m_conf.set("fs.default.name", m_dfs_conf.get("fs.default.name"));
+
+ /*
+ try {
+ DistributedCache.addCacheFile(new URI("file:///hadoop-mapreduce-client-app-1.0-SNAPSHOT.jar"), m_conf);
+ DistributedCache.addCacheFile(new URI("file:///hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT.jar"), m_conf);
+ DistributedCache.addCacheFile(new URI("file:///pig.jar"), m_conf);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ */
+ m_dfs.getFileSystem().copyFromLocalFile(new Path("file:///hadoop-mapreduce-client-app-1.0-SNAPSHOT.jar"), new Path("/hadoop-mapreduce-client-app-1.0-SNAPSHOT.jar"));
+ m_dfs.getFileSystem().copyFromLocalFile(new Path("file:///hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT.jar"), new Path("/hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT.jar"));
+ m_dfs.getFileSystem().copyFromLocalFile(new Path("file:///pig.jar"), new Path("/pig.jar"));
+ m_dfs.getFileSystem().copyFromLocalFile(new Path("file:///pig-test.jar"), new Path("/pig-test.jar"));
+
+ DistributedCache.addFileToClassPath(new Path("/hadoop-mapreduce-client-app-1.0-SNAPSHOT.jar"), m_conf);
+ DistributedCache.addFileToClassPath(new Path("/pig.jar"), m_conf);
+ DistributedCache.addFileToClassPath(new Path("/pig-test.jar"), m_conf);
+ DistributedCache.addFileToClassPath(new Path("/hadoop-mapreduce-client-jobclient-1.0-SNAPSHOT.jar"), m_conf);
+
+ //ConfigurationUtil.mergeConf(m_conf, m_dfs_conf);
+ //ConfigurationUtil.mergeConf(m_conf, m_mr_conf);
+
+ m_conf.setInt("mapred.submit.replication", 2);
+ m_conf.set("dfs.datanode.address", "0.0.0.0:0");
+ m_conf.set("dfs.datanode.http.address", "0.0.0.0:0");
+ m_conf.set("mapred.map.max.attempts", "2");
+ m_conf.set("mapred.reduce.max.attempts", "2");
+ m_conf.writeXml(new FileOutputStream(conf_file));
+
+// try {
+// Thread.sleep(1000*1000);
+// } catch (InterruptedException e) {
+// // TODO Auto-generated catch block
+// e.printStackTrace();
+// }
+
+ System.err.println("XXX: Setting fs.default.name to: " + m_dfs_conf.get("fs.default.name"));
+ // Set the system properties needed by Pig
+ System.setProperty("cluster", m_conf.get("mapred.job.tracker"));
+ //System.setProperty("namenode", m_dfs_conf.get("fs.default.name"));
+ System.setProperty("namenode", m_conf.get("fs.default.name"));
+ System.setProperty("junit.hadoop.conf", conf_dir.getPath());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void shutdownMiniMrClusters() {
+ if (m_mr != null) { m_mr.stop(); }
+ m_mr = null;
+ }
+}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java Tue Jul 19 01:01:53 2011
@@ -25,6 +25,8 @@ import java.util.Properties;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
public class ConfigurationUtil {
@@ -65,8 +67,13 @@ public class ConfigurationUtil {
}
public static Properties getLocalFSProperties() {
- Configuration localConf = new Configuration(false);
- localConf.addResource("core-default.xml");
+ Configuration localConf;
+ if (PigMapReduce.sJobContext!=null && PigMapReduce.sJobContext.getConfiguration().get("exectype").equals(ExecType.LOCAL.toString())) {
+ localConf = new Configuration(false);
+ localConf.addResource("core-default.xml");
+ } else {
+ localConf = new Configuration(true);
+ }
Properties props = ConfigurationUtil.toProperties(localConf);
props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
return props;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Jul 19 01:01:53 2011
@@ -64,10 +64,11 @@ import org.apache.pig.pen.POOptimizeDisa
public class HExecutionEngine {
public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker";
- private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
+ private static final String FILE_SYSTEM_LOCATION = "fs.defaultFS";
private static final String HADOOP_SITE = "hadoop-site.xml";
private static final String CORE_SITE = "core-site.xml";
+ private static final String YARN_SITE = "yarn-site.xml";
private final Log log = LogFactory.getLog(getClass());
public static final String LOCAL = "local";
@@ -155,6 +156,7 @@ public class HExecutionEngine {
jc = new JobConf();
jc.addResource("pig-cluster-hadoop-site.xml");
+ jc.addResource(YARN_SITE);
// Trick to invoke static initializer of DistributedFileSystem to add hdfs-default.xml
// into configuration
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Tue Jul 19 01:01:53 2011
@@ -84,6 +84,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
@@ -1354,8 +1355,7 @@ public class MRCompiler extends PhyPlanV
Job job = new Job(conf);
loader.setLocation(location, job);
InputFormat inf = loader.getInputFormat();
- List<InputSplit> splits = inf.getSplits(new JobContext(
- job.getConfiguration(), job.getJobID()));
+ List<InputSplit> splits = inf.getSplits(HadoopShims.cloneJobContext(job));
List<List<InputSplit>> results = MapRedUtil
.getCombinePigSplits(splits, fs
.getDefaultBlockSize(), conf);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Tue Jul 19 01:01:53 2011
@@ -55,6 +55,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -563,7 +565,7 @@ public class MapReduceLauncher extends L
* @throws IOException
*/
private void storeSchema(Job job, POStore st) throws IOException {
- JobContext jc = new JobContext(job.getJobConf(),
+ JobContext jc = HadoopShims.createJobContext(job.getJobConf(),
new org.apache.hadoop.mapreduce.JobID());
JobContext updatedJc = PigOutputCommitter.setUpContext(jc, st);
PigOutputCommitter.storeCleanup(st, updatedJc.getConfiguration());
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java?rev=1148117&r1=1148116&r2=1148117&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReducePOStoreImpl.java Tue Jul 19 01:01:53 2011
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.TaskI
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStoreImpl;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.PigStatusReporter;
/**
@@ -58,7 +60,8 @@ public class MapReducePOStoreImpl extend
// task (map or reduce) we could have multiple stores, we should
// make this copy so that the same context does not get over-written
// by the different stores.
- this.context = new TaskAttemptContext(outputConf,
+
+ this.context = HadoopShims.createTaskAttemptContext(outputConf,
context.getTaskAttemptID());
}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1148117&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Tue Jul 19 01:01:53 2011
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
+
+/**
+ * This class is the base class for PigMapBase, which has slightly
+ * difference among different versions of hadoop. PigMapBase implementation
+ * is located in $PIG_HOME/shims.
+**/
+public abstract class PigGenericMapBase extends Mapper<Text, Tuple, PigNullableWritable, Writable> {
+ private static final Tuple DUMMYTUPLE = null;
+
+ private final Log log = LogFactory.getLog(getClass());
+
+ protected byte keyType;
+
+ //Map Plan
+ protected PhysicalPlan mp = null;
+
+ // Store operators
+ protected List<POStore> stores;
+
+ protected TupleFactory tf = TupleFactory.getInstance();
+
+ boolean inIllustrator = false;
+
+ Context outputCollector;
+
+ // Reporter that will be used by operators
+ // to transmit heartbeat
+ ProgressableReporter pigReporter;
+
+ protected boolean errorInMap = false;
+
+ PhysicalOperator[] roots;
+
+ private PhysicalOperator leaf;
+
+ PigContext pigContext = null;
+ private volatile boolean initialized = false;
+
+ /**
+ * for local map/reduce simulation
+ * @param plan the map plan
+ */
+ public void setMapPlan(PhysicalPlan plan) {
+ mp = plan;
+ }
+
+ /**
+ * Will be called when all the tuples in the input
+ * are done. So reporter thread should be closed.
+ */
+ @Override
+ public void cleanup(Context context) throws IOException, InterruptedException {
+ super.cleanup(context);
+ if(errorInMap) {
+ //error in map - returning
+ return;
+ }
+
+ if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) {
+ // If there is a stream in the pipeline or if this map job belongs to merge-join we could
+ // potentially have more to process - so lets
+ // set the flag stating that all map input has been sent
+ // already and then lets run the pipeline one more time
+ // This will result in nothing happening in the case
+ // where there is no stream or it is not a merge-join in the pipeline
+ mp.endOfAllInput = true;
+ runPipeline(leaf);
+ }
+
+ for (POStore store: stores) {
+ if (!initialized) {
+ MapReducePOStoreImpl impl
+ = new MapReducePOStoreImpl(context);
+ store.setStoreImpl(impl);
+ store.setUp();
+ }
+ store.tearDown();
+ }
+
+ //Calling EvalFunc.finish()
+ UDFFinishVisitor finisher = new UDFFinishVisitor(mp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(mp));
+ try {
+ finisher.visit();
+ } catch (VisitorException e) {
+ int errCode = 2121;
+ String msg = "Error while calling finish method on UDFs.";
+ throw new VisitorException(msg, errCode, PigException.BUG, e);
+ }
+
+ mp = null;
+
+ PhysicalOperator.setReporter(null);
+ initialized = false;
+ }
+
+ /**
+ * Configures the mapper with the map plan and the
+ * reproter thread
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+
+ Configuration job = context.getConfiguration();
+ SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
+ PigMapReduce.sJobContext = context;
+ PigMapReduce.sJobConfInternal.set(context.getConfiguration());
+ PigMapReduce.sJobConf = context.getConfiguration();
+ inIllustrator = (context instanceof PigMapBase.IllustratorContext);
+
+ PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(job.get("udf.import.list")));
+ pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
+ if (pigContext.getLog4jProperties()!=null)
+ PropertyConfigurator.configure(pigContext.getLog4jProperties());
+
+ if (mp == null)
+ mp = (PhysicalPlan) ObjectSerializer.deserialize(
+ job.get("pig.mapPlan"));
+ stores = PlanHelper.getStores(mp);
+
+ // To be removed
+ if(mp.isEmpty())
+ log.debug("Map Plan empty!");
+ else{
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ mp.explain(baos);
+ log.debug(baos.toString());
+ }
+ keyType = ((byte[])ObjectSerializer.deserialize(job.get("pig.map.keytype")))[0];
+ // till here
+
+ pigReporter = new ProgressableReporter();
+ // Get the UDF specific context
+ MapRedUtil.setupUDFContext(job);
+
+ if(!(mp.isEmpty())) {
+
+ PigSplit split = (PigSplit)context.getInputSplit();
+ List<OperatorKey> targetOpKeys = split.getTargetOps();
+
+ ArrayList<PhysicalOperator> targetOpsAsList = new ArrayList<PhysicalOperator>();
+ for (OperatorKey targetKey : targetOpKeys) {
+ targetOpsAsList.add(mp.getOperator(targetKey));
+ }
+ roots = targetOpsAsList.toArray(new PhysicalOperator[1]);
+ leaf = mp.getLeaves().get(0);
+ }
+
+ PigStatusReporter.setContext(context);
+
+ }
+
+ /**
+ * The map function that attaches the inpTuple appropriately
+ * and executes the map plan if its not empty. Collects the
+ * result of execution into oc or the input directly to oc
+ * if map plan empty. The collection is left abstract for the
+ * map-only or map-reduce job to implement. Map-only collects
+ * the tuple as-is whereas map-reduce collects it after extracting
+ * the key and indexed tuple.
+ */
+ @Override
+ protected void map(Text key, Tuple inpTuple, Context context) throws IOException, InterruptedException {
+ if(!initialized) {
+ initialized = true;
+ // cache the collector for use in runPipeline() which
+ // can be called from close()
+ this.outputCollector = context;
+ pigReporter.setRep(context);
+ PhysicalOperator.setReporter(pigReporter);
+
+ for (POStore store: stores) {
+ MapReducePOStoreImpl impl
+ = new MapReducePOStoreImpl(context);
+ store.setStoreImpl(impl);
+ if (!pigContext.inIllustrator)
+ store.setUp();
+ }
+
+ boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
+
+ PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ pigHadoopLogger.setAggregate(aggregateWarning);
+ pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
+
+ PhysicalOperator.setPigLogger(pigHadoopLogger);
+ }
+
+ if (mp.isEmpty()) {
+ collect(context,inpTuple);
+ return;
+ }
+
+ for (PhysicalOperator root : roots) {
+ if (inIllustrator) {
+ if (root != null) {
+ root.attachInput(inpTuple);
+ }
+ } else {
+ root.attachInput(tf.newTupleNoCopy(inpTuple.getAll()));
+ }
+ }
+
+ runPipeline(leaf);
+ }
+
+ protected void runPipeline(PhysicalOperator leaf) throws IOException, InterruptedException {
+ while(true){
+ Result res = leaf.getNext(DUMMYTUPLE);
+ if(res.returnStatus==POStatus.STATUS_OK){
+ collect(outputCollector,(Tuple)res.result);
+ continue;
+ }
+
+ if(res.returnStatus==POStatus.STATUS_EOP) {
+ return;
+ }
+
+ if(res.returnStatus==POStatus.STATUS_NULL)
+ continue;
+
+ if(res.returnStatus==POStatus.STATUS_ERR){
+ // remember that we had an issue so that in
+ // close() we can do the right thing
+ errorInMap = true;
+ // if there is an errmessage use it
+ String errMsg;
+ if(res.result != null) {
+ errMsg = "Received Error while " +
+ "processing the map plan: " + res.result;
+ } else {
+ errMsg = "Received Error while " +
+ "processing the map plan.";
+ }
+
+ int errCode = 2055;
+ ExecException ee = new ExecException(errMsg, errCode, PigException.BUG);
+ throw ee;
+ }
+ }
+
+ }
+
+ abstract public void collect(Context oc, Tuple tuple) throws InterruptedException, IOException;
+
+ /**
+ * @return the keyType
+ */
+ public byte getKeyType() {
+ return keyType;
+ }
+
+ /**
+ * @param keyType the keyType to set
+ */
+ public void setKeyType(byte keyType) {
+ this.keyType = keyType;
+ }
+
+ abstract public Context getIllustratorContext(Configuration conf, DataBag input,
+ List<Pair<PigNullableWritable, Writable>> output, InputSplit split)
+ throws IOException, InterruptedException;
+}