You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/13 20:47:57 UTC
svn commit: r1541677 - in /pig/branches/tez: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/
src/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/data/ src/org/apach...
Author: cheolsoo
Date: Wed Nov 13 19:47:56 2013
New Revision: 1541677
URL: http://svn.apache.org/r1541677
Log:
Merge latest trunk changes into tez branch
Modified:
pig/branches/tez/ (props changed)
pig/branches/tez/CHANGES.txt
pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java
pig/branches/tez/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java
pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java
pig/branches/tez/src/pig-default.properties (props changed)
pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java
pig/branches/tez/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2 (props changed)
Propchange: pig/branches/tez/
------------------------------------------------------------------------------
Merged /pig/trunk:r1540916-1541675
Modified: pig/branches/tez/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/tez/CHANGES.txt?rev=1541677&r1=1541676&r2=1541677&view=diff
==============================================================================
--- pig/branches/tez/CHANGES.txt (original)
+++ pig/branches/tez/CHANGES.txt Wed Nov 13 19:47:56 2013
@@ -28,6 +28,8 @@ PIG-3419: Pluggable Execution Engine (ac
IMPROVEMENTS
+PIG-3505: Make AvroStorage sync interval take default from io.file.buffer.size (rohini)
+
PIG-3563: support adding archives to the distributed cache (jdonofrio via cheolsoo)
PIG-3388: No support for Regex for row filter in org.apache.pig.backend.hadoop.hbase.HBaseStorage (lbendig via cheolsoo)
@@ -48,8 +50,6 @@ OPTIMIZATIONS
BUG FIXES
-PIG-3507: It fails to run pig in local mode on a Kerberos enabled Hadoop cluster (chiyang via cheolsoo)
-
PIG-3561: Clean up PigStats and JobStats after PIG-3419 (cheolsoo)
PIG-3553: HadoopJobHistoryLoader fails to load job history on hadoop v 1.2 (lgiri via cheolsoo)
@@ -102,6 +102,8 @@ PIG-3480: TFile-based tmpfile compressio
BUG FIXES
+PIG-3570: Rollback PIG-3060 (daijy)
+
PIG-3530: Some e2e tests is broken due to PIG-3480 (daijy)
PIG-3492: ColumnPrune dropping used column due to LogicalRelationalOperator.fixDuplicateUids changes not propagating (knoguchi via daijy)
Modified: pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java?rev=1541677&r1=1541676&r2=1541677&view=diff
==============================================================================
--- pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java (original)
+++ pig/branches/tez/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/PigAvroOutputFormat.java Wed Nov 13 19:47:56 2013
@@ -5,9 +5,9 @@
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
@@ -17,6 +17,7 @@
package org.apache.pig.piggybank.storage.avro;
+import static org.apache.avro.file.DataFileConstants.DEFAULT_SYNC_INTERVAL;
import java.io.IOException;
import org.apache.avro.Schema;
@@ -32,7 +33,7 @@ import org.apache.hadoop.mapreduce.lib.o
/**
* The OutputFormat for avro data.
- *
+ *
*/
public class PigAvroOutputFormat extends FileOutputFormat<NullWritable, Object> {
@@ -47,10 +48,13 @@ public class PigAvroOutputFormat extends
/** The configuration key for the Avro codec. */
public static final String OUTPUT_CODEC = "avro.output.codec";
-
+
/** The deflate codec */
public static final String DEFLATE_CODEC = "deflate";
+ /** The configuration key for Avro sync interval. */
+ public static final String SYNC_INTERVAL_KEY = "avro.mapred.sync.interval";
+
/* avro schema of output data */
private Schema schema = null;
@@ -61,16 +65,16 @@ public class PigAvroOutputFormat extends
}
/**
- * construct with specified output schema
+ * construct with specified output schema
* @param s output schema
*/
public PigAvroOutputFormat(Schema s) {
schema = s;
}
- /**
- * Enable output compression using the deflate codec and
- * specify its level.
+ /**
+ * Enable output compression using the deflate codec and
+ * specify its level.
*/
public static void setDeflateLevel(Job job, int level) {
FileOutputFormat.setCompressOutput(job, true);
@@ -83,7 +87,7 @@ public class PigAvroOutputFormat extends
if (schema == null)
throw new IOException("Must provide a schema");
-
+
Configuration conf = context.getConfiguration();
DataFileWriter<Object> writer = new DataFileWriter<Object>(new PigAvroDatumWriter(schema));
@@ -97,6 +101,10 @@ public class PigAvroOutputFormat extends
writer.setCodec(factory);
}
+ // Do max as core-default.xml has io.file.buffer.size as 4K
+ writer.setSyncInterval(conf.getInt(SYNC_INTERVAL_KEY, Math.max(
+ conf.getInt("io.file.buffer.size", DEFAULT_SYNC_INTERVAL), DEFAULT_SYNC_INTERVAL)));
+
Path path = getDefaultWorkFile(context, EXT);
writer.create(schema, path.getFileSystem(conf).create(path));
return new PigAvroRecordWriter(writer);
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1541677&r1=1541676&r2=1541677&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Wed Nov 13 19:47:56 2013
@@ -441,12 +441,12 @@ public class POForEach extends PhysicalO
if(its[i].hasNext()) {
data[i] = its[i].next();
} else {
- //the input set is null, so we return with EOP. This is
+ //the input set is null, so we return. This is
// caught above and this function recalled with
// new inputs.
its = null;
data = null;
- res.returnStatus = POStatus.STATUS_EOP;
+ res.returnStatus = POStatus.STATUS_NULL;
return res;
}
} else {
Modified: pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1541677&r1=1541676&r2=1541677&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java Wed Nov 13 19:47:56 2013
@@ -97,7 +97,7 @@ public abstract class DefaultAbstractBag
}
for (int i = sampled; iter.hasNext() && sampled < SPILL_SAMPLE_SIZE; i++) {
Tuple t = iter.next();
- if (i % SPILL_SAMPLE_FREQUENCY == 0) {
+ if (t != null && i % SPILL_SAMPLE_FREQUENCY == 0) {
aggSampleTupleSize += t.getMemorySize();
sampled += 1;
}
Modified: pig/branches/tez/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java?rev=1541677&r1=1541676&r2=1541677&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/avro/AvroRecordWriter.java Wed Nov 13 19:47:56 2013
@@ -54,7 +54,7 @@ public class AvroRecordWriter extends Re
private DataFileWriter<GenericData.Record> writer;
private Path out;
private Configuration conf;
-
+
/**
* Creates new AvroRecordWriter.
* @param s Schema for the files on this output path
@@ -81,8 +81,9 @@ public class AvroRecordWriter extends Re
writer.setCodec(factory);
}
- writer.setSyncInterval(job.getInt(SYNC_INTERVAL_KEY,
- DEFAULT_SYNC_INTERVAL));
+ // Do max as core-default.xml has io.file.buffer.size as 4K
+ writer.setSyncInterval(job.getInt(SYNC_INTERVAL_KEY, Math.max(
+ job.getInt("io.file.buffer.size", DEFAULT_SYNC_INTERVAL), DEFAULT_SYNC_INTERVAL)));
// copy metadata from job
for (Map.Entry<String,String> e : job) {
@@ -95,7 +96,7 @@ public class AvroRecordWriter extends Re
.getBytes("ISO-8859-1"));
}
}
-
+
@Override
public void close(final TaskAttemptContext arg0)
throws IOException, InterruptedException {
@@ -115,7 +116,7 @@ public class AvroRecordWriter extends Re
.packIntoAvro((Tuple) value, schema));
}
}
-
+
public void prepareToWrite(Schema s) throws IOException {
if (s == null) {
throw new IOException(
@@ -129,5 +130,5 @@ public class AvroRecordWriter extends Re
writer.create(s, out.getFileSystem(conf).create(out));
}
-
+
}
Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java?rev=1541677&r1=1541676&r2=1541677&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java Wed Nov 13 19:47:56 2013
@@ -19,9 +19,7 @@ package org.apache.pig.newplan.logical.r
import java.io.IOException;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFuncInterface;
@@ -64,16 +62,14 @@ public class InputOutputFileValidator {
try {
if(store.getSchema() != null){
- sf.checkSchema(new ResourceSchema(store.getSchema(), store.getSortInfo()));
+ sf.checkSchema(new ResourceSchema(store.getSchema(), store.getSortInfo()));
}
- Configuration conf = ConfigurationUtil.toConfiguration(pigCtx.getProperties());
- dummyJob = new Job(conf);
+ dummyJob = new Job(ConfigurationUtil.toConfiguration(pigCtx.getProperties()));
sf.setStoreLocation(outLoc, dummyJob);
- UserGroupInformation.setConfiguration(conf);
} catch (Exception ioe) {
if(ioe instanceof PigException){
errCode = ((PigException)ioe).getErrorCode();
- }
+ }
String exceptionMsg = ioe.getMessage();
validationErrStr += (exceptionMsg == null) ? "" : " More info to follow:\n" +exceptionMsg;
throw new VisitorException(store, validationErrStr, errCode, pigCtx.getErrorSource(), ioe);
Propchange: pig/branches/tez/src/pig-default.properties
------------------------------------------------------------------------------
Merged /pig/trunk/src/pig-default.properties:r1540916-1541675
Modified: pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1541677&r1=1541676&r2=1541677&view=diff
==============================================================================
--- pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java (original)
+++ pig/branches/tez/test/org/apache/pig/test/TestEvalPipelineLocal.java Wed Nov 13 19:47:56 2013
@@ -1173,4 +1173,34 @@ public class TestEvalPipelineLocal {
Assert.assertFalse(iter.hasNext());
}
+ static public class GenBag extends EvalFunc<DataBag> {
+ @Override
+ public DataBag exec(Tuple input) throws IOException {
+ Integer content = (Integer)input.get(0);
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
+
+ if (content > 10) {
+ Tuple t = TupleFactory.getInstance().newTuple();
+ t.append(content);
+ bag.add(t);
+ }
+ return bag;
+ }
+ }
+ // Two flatten statement in a pipeline, see PIG-3292
+ @Test
+ public void testFlattenTwice() throws Exception{
+ File f1 = createFile(new String[]{"{(1),(12),(9)}", "{(15),(2)}"});
+
+ pigServer.registerQuery("a = load '" + Util.encodeEscape(Util.generateURI(f1.toString(), pigServer.getPigContext()))
+ + "' as (bag1:bag{(t:int)});");
+ pigServer.registerQuery("b = foreach a generate flatten(bag1) as field1;");
+ pigServer.registerQuery("c = foreach b generate flatten(" + GenBag.class.getName() + "(field1));");
+
+ Iterator<Tuple> iter = pigServer.openIterator("c");
+ Assert.assertEquals(iter.next().toString(), "(12)");
+ Assert.assertEquals(iter.next().toString(), "(15)");
+
+ Assert.assertFalse(iter.hasNext());
+ }
}
Propchange: pig/branches/tez/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
Merged /pig/trunk/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2:r1540916-1541675