You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/13 20:47:57 UTC

svn commit: r1541677 - in /pig/branches/tez: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/ src/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/data/ src/org/apach...

Author: cheolsoo
Date: Wed Nov 13 19:47:56 2013
New Revision: 1541677

URL: http://svn.apache.org/r1541677
Log:
Merge latest trunk changes into tez branch

Modified:
    pig/branches/tez/   (props changed)
    pig/branches/tez/CHANGES.txt
    pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
    pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java
    pig/branches/tez/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java
    pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java
    pig/branches/tez/src/pig-default.properties   (props changed)
    pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java
    pig/branches/tez/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2   (props changed)

Propchange: pig/branches/tez/
------------------------------------------------------------------------------
  Merged /pig/trunk:r1540916-1541675

Modified: pig/branches/tez/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/tez/CHANGES.txt?rev=1541677&r1=1541676&r2=1541677&view=diff
==============================================================================
--- pig/branches/tez/CHANGES.txt (original)
+++ pig/branches/tez/CHANGES.txt Wed Nov 13 19:47:56 2013
@@ -28,6 +28,8 @@ PIG-3419: Pluggable Execution Engine (ac
 
 IMPROVEMENTS
 
+PIG-3505: Make AvroStorage sync interval take default from io.file.buffer.size (rohini)
+
 PIG-3563: support adding archives to the distributed cache (jdonofrio via cheolsoo)
 
 PIG-3388: No support for Regex for row filter in org.apache.pig.backend.hadoop.hbase.HBaseStorage (lbendig via cheolsoo)
@@ -48,8 +50,6 @@ OPTIMIZATIONS
  
 BUG FIXES
 
-PIG-3507: It fails to run pig in local mode on a Kerberos enabled Hadoop cluster (chiyang via cheolsoo)
-
 PIG-3561: Clean up PigStats and JobStats after PIG-3419 (cheolsoo)
 
 PIG-3553: HadoopJobHistoryLoader fails to load job history on hadoop v 1.2 (lgiri via cheolsoo)
@@ -102,6 +102,8 @@ PIG-3480: TFile-based tmpfile compressio
 
 BUG FIXES
 
+PIG-3570: Rollback PIG-3060 (daijy)
+
 PIG-3530: Some e2e tests is broken due to PIG-3480 (daijy)
 
 PIG-3492: ColumnPrune dropping used column due to LogicalRelationalOperator.fixDuplicateUids changes not propagating (knoguchi via daijy)

Modified: pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java?rev=1541677&r1=1541676&r2=1541677&view=diff
==============================================================================
--- pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java (original)
+++ pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java Wed Nov 13 19:47:56 2013
@@ -5,9 +5,9 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -17,6 +17,7 @@
 
 package org.apache.pig.piggybank.storage.avro;
 
+import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
 import java.io.IOException;
 
 import org.apache.avro.Schema;
@@ -32,7 +33,7 @@ import org.apache.hadoop.mapreduce.lib.o
 
 /**
  * The OutputFormat for avro data.
- * 
+ *
  */
 public class PigAvroOutputFormat extends FileOutputFormat<NullWritable, Object> {
 
@@ -47,10 +48,13 @@ public class PigAvroOutputFormat extends
 
     /** The configuration key for the Avro codec. */
     public static final String OUTPUT_CODEC = "avro.output.codec";
-    
+
     /** The deflate codec */
     public static final String DEFLATE_CODEC = "deflate";
 
+    /** The configuration key for Avro sync interval. */
+    public static final String SYNC_INTERVAL_KEY = "avro.mapred.sync.interval";
+
     /* avro schema of output data */
     private Schema schema = null;
 
@@ -61,16 +65,16 @@ public class PigAvroOutputFormat extends
     }
 
     /**
-     * construct with specified output schema 
+     * construct with specified output schema
      * @param s             output schema
      */
     public PigAvroOutputFormat(Schema s) {
         schema = s;
     }
 
-    /** 
-     * Enable output compression using the deflate codec and 
-     * specify its level. 
+    /**
+     * Enable output compression using the deflate codec and
+     * specify its level.
      */
     public static void setDeflateLevel(Job job, int level) {
         FileOutputFormat.setCompressOutput(job, true);
@@ -83,7 +87,7 @@ public class PigAvroOutputFormat extends
 
         if (schema == null)
             throw new IOException("Must provide a schema");
-        
+
         Configuration conf = context.getConfiguration();
 
         DataFileWriter<Object> writer = new DataFileWriter<Object>(new PigAvroDatumWriter(schema));
@@ -97,6 +101,10 @@ public class PigAvroOutputFormat extends
             writer.setCodec(factory);
         }
 
+        // Do max as core-default.xml has io.file.buffer.size as 4K
+        writer.setSyncInterval(conf.getInt(SYNC_INTERVAL_KEY, Math.max(
+                conf.getInt("io.file.buffer.size", DEFAULT_SYNC_INTERVAL), DEFAULT_SYNC_INTERVAL)));
+
         Path path = getDefaultWorkFile(context, EXT);
         writer.create(schema, path.getFileSystem(conf).create(path));
         return new PigAvroRecordWriter(writer);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1541677&r1=1541676&r2=1541677&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Wed Nov 13 19:47:56 2013
@@ -441,12 +441,12 @@ public class POForEach extends PhysicalO
                         if(its[i].hasNext()) {
                             data[i] = its[i].next();
                         } else {
-                            //the input set is null, so we return with EOP.  This is
+                            //the input set is null, so we return.  This is
                             // caught above and this function recalled with
                             // new inputs.
                             its = null;
                             data = null;
-                            res.returnStatus = POStatus.STATUS_EOP;
+                            res.returnStatus = POStatus.STATUS_NULL;
                             return res;
                         }
                     } else {

Modified: pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1541677&r1=1541676&r2=1541677&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java Wed Nov 13 19:47:56 2013
@@ -97,7 +97,7 @@ public abstract class DefaultAbstractBag
             }
             for (int i = sampled; iter.hasNext() && sampled < SPILL_SAMPLE_SIZE; i++) {
                 Tuple t = iter.next();
-                if (i % SPILL_SAMPLE_FREQUENCY == 0) {
+                if (t != null && i % SPILL_SAMPLE_FREQUENCY == 0) {
                     aggSampleTupleSize += t.getMemorySize();
                     sampled += 1;
                 }

Modified: pig/branches/tez/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java?rev=1541677&r1=1541676&r2=1541677&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java Wed Nov 13 19:47:56 2013
@@ -54,7 +54,7 @@ public class AvroRecordWriter extends Re
   private DataFileWriter<GenericData.Record> writer;
   private Path out;
   private Configuration conf;
-  
+
   /**
    * Creates new AvroRecordWriter.
    * @param s Schema for the files on this output path
@@ -81,8 +81,9 @@ public class AvroRecordWriter extends Re
       writer.setCodec(factory);
     }
 
-    writer.setSyncInterval(job.getInt(SYNC_INTERVAL_KEY,
-        DEFAULT_SYNC_INTERVAL));
+    // Do max as core-default.xml has io.file.buffer.size as 4K
+    writer.setSyncInterval(job.getInt(SYNC_INTERVAL_KEY, Math.max(
+            job.getInt("io.file.buffer.size", DEFAULT_SYNC_INTERVAL), DEFAULT_SYNC_INTERVAL)));
 
     // copy metadata from job
     for (Map.Entry<String,String> e : job) {
@@ -95,7 +96,7 @@ public class AvroRecordWriter extends Re
                        .getBytes("ISO-8859-1"));
     }
   }
-  
+
   @Override
   public void close(final TaskAttemptContext arg0)
       throws IOException, InterruptedException {
@@ -115,7 +116,7 @@ public class AvroRecordWriter extends Re
           .packIntoAvro((Tuple) value, schema));
     }
   }
-  
+
   public void prepareToWrite(Schema s) throws IOException {
     if (s == null) {
       throw new IOException(
@@ -129,5 +130,5 @@ public class AvroRecordWriter extends Re
     writer.create(s, out.getFileSystem(conf).create(out));
 
   }
-  
+
 }

Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java?rev=1541677&r1=1541676&r2=1541677&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java Wed Nov 13 19:47:56 2013
@@ -19,9 +19,7 @@ package org.apache.pig.newplan.logical.r
 
 import java.io.IOException;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.StoreFuncInterface;
@@ -64,16 +62,14 @@ public class InputOutputFileValidator {
             
             try {
                 if(store.getSchema() != null){
-                    sf.checkSchema(new ResourceSchema(store.getSchema(), store.getSortInfo()));
+                    sf.checkSchema(new ResourceSchema(store.getSchema(), store.getSortInfo()));                
                 }
-                Configuration conf = ConfigurationUtil.toConfiguration(pigCtx.getProperties());
-                dummyJob = new Job(conf);
+                dummyJob = new Job(ConfigurationUtil.toConfiguration(pigCtx.getProperties()));
                 sf.setStoreLocation(outLoc, dummyJob);
-                UserGroupInformation.setConfiguration(conf);
             } catch (Exception ioe) {
                 if(ioe instanceof PigException){
                     errCode = ((PigException)ioe).getErrorCode();
-                }
+                } 
                 String exceptionMsg = ioe.getMessage();
                 validationErrStr += (exceptionMsg == null) ? "" : " More info to follow:\n" +exceptionMsg;
                 throw new VisitorException(store, validationErrStr, errCode, pigCtx.getErrorSource(), ioe);

Propchange: pig/branches/tez/src/pig-default.properties
------------------------------------------------------------------------------
  Merged /pig/trunk/src/pig-default.properties:r1540916-1541675

Modified: pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1541677&r1=1541676&r2=1541677&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java Wed Nov 13 19:47:56 2013
@@ -1173,4 +1173,34 @@ public class TestEvalPipelineLocal {
         
         Assert.assertFalse(iter.hasNext());
     }
+    static public class GenBag extends EvalFunc<DataBag> {
+        @Override
+        public DataBag exec(Tuple input) throws IOException {
+            Integer content = (Integer)input.get(0);
+            DataBag bag = BagFactory.getInstance().newDefaultBag();
+
+            if (content > 10) {
+                Tuple t = TupleFactory.getInstance().newTuple();
+                t.append(content);
+                bag.add(t);
+            }
+            return bag;
+        }
+    }
+    // Two flatten statement in a pipeline, see PIG-3292
+    @Test
+    public void testFlattenTwice() throws Exception{
+        File f1 = createFile(new String[]{"{(1),(12),(9)}", "{(15),(2)}"});
+        
+        pigServer.registerQuery("a = load '" + Util.encodeEscape(Util.generateURI(f1.toString(), pigServer.getPigContext()))
+                + "' as (bag1:bag{(t:int)});");
+        pigServer.registerQuery("b = foreach a generate flatten(bag1) as field1;");
+        pigServer.registerQuery("c = foreach b generate flatten(" + GenBag.class.getName() + "(field1));");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("c");
+        Assert.assertEquals(iter.next().toString(), "(12)");
+        Assert.assertEquals(iter.next().toString(), "(15)");
+        
+        Assert.assertFalse(iter.hasNext());
+    }
 }

Propchange: pig/branches/tez/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
  Merged /pig/trunk/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2:r1540916-1541675