You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/10/31 00:37:45 UTC

svn commit: r1635640 - in /pig/branches/branch-0.14: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/ src/org/apache/pig/builtin/ test/org/apache/pig...

Author: daijy
Date: Thu Oct 30 23:37:44 2014
New Revision: 1635640

URL: http://svn.apache.org/r1635640
Log:
PIG-4253: Add a SequenceID UDF

Added:
    pig/branches/branch-0.14/src/org/apache/pig/builtin/SequenceID.java
Modified:
    pig/branches/branch-0.14/CHANGES.txt
    pig/branches/branch-0.14/src/org/apache/pig/PigConstants.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
    pig/branches/branch-0.14/test/org/apache/pig/test/TestBuiltin.java

Modified: pig/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/CHANGES.txt?rev=1635640&r1=1635639&r2=1635640&view=diff
==============================================================================
--- pig/branches/branch-0.14/CHANGES.txt (original)
+++ pig/branches/branch-0.14/CHANGES.txt Thu Oct 30 23:37:44 2014
@@ -95,6 +95,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4253: Add a SequenceID UDF (daijy)
+
 PIG-4166: Collected group drops last record when combined with merge join (bridiver via daijy)
 
 PIG-2495: Using merge JOIN from a HBaseStorage produces an error (bridiver via daijy)

Modified: pig/branches/branch-0.14/src/org/apache/pig/PigConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/PigConstants.java?rev=1635640&r1=1635639&r2=1635640&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/PigConstants.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/PigConstants.java Thu Oct 30 23:37:44 2014
@@ -57,4 +57,6 @@ public class PigConstants {
      */
     public static final String TIME_UDFS_INVOCATION_COUNTER = "approx_invocations";
     public static final String TIME_UDFS_ELAPSED_TIME_COUNTER = "approx_microsecs";
+
+    public static final String TASK_INDEX = "mapreduce.task.index";
 }
\ No newline at end of file

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java?rev=1635640&r1=1635639&r2=1635640&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java Thu Oct 30 23:37:44 2014
@@ -23,7 +23,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.joda.time.DateTimeZone;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -32,6 +31,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.log4j.PropertyConfigurator;
+import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -162,6 +162,7 @@ public abstract class PigGenericMapBase 
 
         Configuration job = context.getConfiguration();
         SpillableMemoryManager.configure(ConfigurationUtil.toProperties(job));
+        context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
         PigMapReduce.sJobContext = context;
         PigMapReduce.sJobConfInternal.set(context.getConfiguration());
         PigMapReduce.sJobConf = context.getConfiguration();

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1635640&r1=1635639&r2=1635640&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Thu Oct 30 23:37:44 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.jobcontr
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.pig.JVMReuseManager;
+import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -319,6 +320,7 @@ public class PigGenericMapReduce {
                 pack = getPack(context);
             Configuration jConf = context.getConfiguration();
             SpillableMemoryManager.configure(ConfigurationUtil.toProperties(jConf));
+            context.getConfiguration().set(PigConstants.TASK_INDEX, Integer.toString(context.getTaskAttemptID().getTaskID().getId()));
             sJobContext = context;
             sJobConfInternal.set(context.getConfiguration());
             sJobConf = context.getConfiguration();

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1635640&r1=1635639&r2=1635640&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Thu Oct 30 23:37:44 2014
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.JVMReuseImpl;
+import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
@@ -121,6 +122,7 @@ public class PigProcessor extends Abstra
 
         // To determine front-end in UDFContext
         conf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, getContext().getUniqueIdentifier());
+        conf.set(PigConstants.TASK_INDEX, Integer.toString(getContext().getTaskIndex()));
         UDFContext.getUDFContext().addJobConf(conf);
         UDFContext.getUDFContext().deserialize();
 

Added: pig/branches/branch-0.14/src/org/apache/pig/builtin/SequenceID.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/builtin/SequenceID.java?rev=1635640&view=auto
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/builtin/SequenceID.java (added)
+++ pig/branches/branch-0.14/src/org/apache/pig/builtin/SequenceID.java Thu Oct 30 23:37:44 2014
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.builtin;
+
+import java.io.IOException;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigConstants;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * SequenceID generates a unique id for each records in the job. This unique id is
+ * stable in task retry. Any arguments to the function are ignored.
+ * Example:
+ *      A = load 'mydata' as (name);
+ *      B = foreach A generate name, SequenceID();
+ * SequenceID takes the form "index-sequence"
+ */
+public class SequenceID extends EvalFunc<String> {
+    long sequence = 0;
+    @Override
+    public String exec(Tuple input) throws IOException {
+        String taskIndex = PigMapReduce.sJobConfInternal.get().get(PigConstants.TASK_INDEX);    
+        String sequenceId = taskIndex + "-" + Long.toString(sequence);
+        sequence++;
+        return sequenceId;
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return new Schema(new Schema.FieldSchema("sequenceid", DataType.CHARARRAY));
+    }
+}

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/TestBuiltin.java?rev=1635640&r1=1635639&r2=1635640&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/TestBuiltin.java Thu Oct 30 23:37:44 2014
@@ -3129,4 +3129,28 @@ public class TestBuiltin {
         
     }
 
+    @Test
+    public void testSequenceID() throws Exception {
+        Util.resetStateForExecModeSwitch();
+        String inputFileName = "testSequenceID.txt";
+        Util.createInputFile(cluster, inputFileName, new String[]
+            {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"});
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", "10");
+        pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true");
+        pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
+        pigServer.registerQuery("B = foreach A generate name, SequenceID();");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        iter.next().get(1).equals("0-0");
+        iter.next().get(1).equals("0-1");
+        iter.next().get(1).equals("0-2");
+        iter.next().get(1).equals("0-3");
+        iter.next().get(1).equals("0-4");
+        iter.next().get(1).equals("1-0");
+        iter.next().get(1).equals("1-1");
+        iter.next().get(1).equals("1-1");
+        iter.next().get(1).equals("1-2");
+        iter.next().get(1).equals("1-3");
+        iter.next().get(1).equals("1-4");
+    }
 }