You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/02/24 22:41:41 UTC
svn commit: r1571454 [4/5] - in /pig/branches/tez: ./ conf/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/string/
contrib/piggybank/java/src/main/java/...
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Mon Feb 24 21:41:38 2014
@@ -37,6 +37,8 @@ import org.apache.commons.cli.CommandLin
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
@@ -166,7 +168,7 @@ public class HBaseStorage extends LoadFu
private final long limit_;
private final boolean cacheBlocks_;
private final int caching_;
- private final boolean noWAL_;
+ private boolean noWAL_;
private final long minTimestamp_;
private final long maxTimestamp_;
private final long timestamp_;
@@ -183,7 +185,8 @@ public class HBaseStorage extends LoadFu
private RequiredFieldList requiredFieldList;
private static void populateValidOptions() {
- validOptions_.addOption("loadKey", false, "Load Key");
+ Option loadKey = OptionBuilder.hasOptionalArgs(1).withArgName("loadKey").withLongOpt("loadKey").withDescription("Load Key").create();
+ validOptions_.addOption(loadKey);
validOptions_.addOption("gt", true, "Records must be greater than this value " +
"(binary, double-slash-escaped)");
validOptions_.addOption("lt", true, "Records must be less than this value (binary, double-slash-escaped)");
@@ -197,11 +200,11 @@ public class HBaseStorage extends LoadFu
validOptions_.addOption("ignoreWhitespace", true, "Ignore spaces when parsing columns");
validOptions_.addOption("caster", true, "Caster to use for converting values. A class name, " +
"HBaseBinaryConverter, or Utf8StorageConverter. For storage, casters must implement LoadStoreCaster.");
- validOptions_.addOption("noWAL", false, "Sets the write ahead to false for faster loading. To be used with extreme caution since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).");
+ Option noWal = OptionBuilder.hasOptionalArgs(1).withArgName("noWAL").withLongOpt("noWAL").withDescription("Sets the write ahead to false for faster loading. To be used with extreme caution since this could result in data loss (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).").create();
+ validOptions_.addOption(noWal);
validOptions_.addOption("minTimestamp", true, "Record must have timestamp greater or equal to this value");
validOptions_.addOption("maxTimestamp", true, "Record must have timestamp less then this value");
validOptions_.addOption("timestamp", true, "Record must have timestamp equal to this value");
-
}
/**
@@ -263,7 +266,13 @@ public class HBaseStorage extends LoadFu
throw e;
}
- loadRowKey_ = configuredOptions_.hasOption("loadKey");
+ loadRowKey_ = false;
+ if (configuredOptions_.hasOption("loadKey")) {
+ String value = configuredOptions_.getOptionValue("loadKey");
+ if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) || value == null ) {//the empty string and null check is for backward compat.
+ loadRowKey_ = true;
+ }
+ }
delimiter_ = ",";
if (configuredOptions_.getOptionValue("delim") != null) {
@@ -302,7 +311,13 @@ public class HBaseStorage extends LoadFu
caching_ = Integer.valueOf(configuredOptions_.getOptionValue("caching", "100"));
cacheBlocks_ = Boolean.valueOf(configuredOptions_.getOptionValue("cacheBlocks", "false"));
limit_ = Long.valueOf(configuredOptions_.getOptionValue("limit", "-1"));
- noWAL_ = configuredOptions_.hasOption("noWAL");
+ noWAL_ = false;
+ if (configuredOptions_.hasOption("noWAL")) {
+ String value = configuredOptions_.getOptionValue("noWAL");
+ if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) || value == null) {//the empty string and null check is for backward compat.
+ noWAL_ = true;
+ }
+ }
if (configuredOptions_.hasOption("minTimestamp")){
minTimestamp_ = Long.parseLong(configuredOptions_.getOptionValue("minTimestamp"));
Modified: pig/branches/tez/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/BinStorage.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/BinStorage.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/BinStorage.java Mon Feb 24 21:41:38 2014
@@ -143,8 +143,8 @@ implements StoreFuncInterface, LoadMetad
private static final Log mLog = LogFactory.getLog(BinStorage.class);
protected long end = Long.MAX_VALUE;
- static String casterString = null;
- static LoadCaster caster = null;
+ private String casterString = null;
+ private LoadCaster caster = null;
private BinStorageRecordReader recReader = null;
private BinStorageRecordWriter recWriter = null;
Modified: pig/branches/tez/src/org/apache/pig/builtin/JsonStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/JsonStorage.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/JsonStorage.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/JsonStorage.java Mon Feb 24 21:41:38 2014
@@ -113,7 +113,7 @@ public class JsonStorage extends StoreFu
UDFContext udfc = UDFContext.getUDFContext();
Properties p =
udfc.getUDFProperties(this.getClass(), new String[]{udfcSignature});
- p.setProperty(SCHEMA_SIGNATURE, s.toString());
+ p.setProperty(SCHEMA_SIGNATURE, fixSchema(s).toString());
}
@@ -310,4 +310,12 @@ public class JsonStorage extends StoreFu
metadataWriter.storeSchema(schema, location, job);
}
+ public ResourceSchema fixSchema(ResourceSchema s){
+ for (ResourceFieldSchema filed : s.getFields()) {
+ if(filed.getType() == DataType.NULL)
+ filed.setType(DataType.BYTEARRAY);
+ }
+ return s;
+ }
+
}
Modified: pig/branches/tez/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/builtin/PigStorage.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/branches/tez/src/org/apache/pig/builtin/PigStorage.java Mon Feb 24 21:41:38 2014
@@ -27,10 +27,14 @@ import org.apache.commons.cli.CommandLin
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
@@ -50,6 +54,7 @@ import org.apache.pig.LoadCaster;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPushDown;
+import org.apache.pig.OverwritableStoreFunc;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
@@ -61,10 +66,12 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.CastUtils;
import org.apache.pig.impl.util.ObjectSerializer;
@@ -125,7 +132,7 @@ import org.apache.pig.parser.ParserExcep
*/
@SuppressWarnings("unchecked")
public class PigStorage extends FileInputLoadFunc implements StoreFuncInterface,
-LoadPushDown, LoadMetadata, StoreMetadata {
+LoadPushDown, LoadMetadata, StoreMetadata, OverwritableStoreFunc {
protected RecordReader in = null;
protected RecordWriter writer = null;
protected final Log mLog = LogFactory.getLog(getClass());
@@ -138,6 +145,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
boolean isSchemaOn = false;
boolean dontLoadSchema = false;
+ boolean overwriteOutput = false;
protected ResourceSchema schema;
protected LoadCaster caster;
@@ -161,6 +169,8 @@ LoadPushDown, LoadMetadata, StoreMetadat
validOptions.addOption(TAG_SOURCE_FILE, false, "Appends input source file name to beginning of each tuple.");
validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to beginning of each tuple.");
validOptions.addOption("tagsource", false, "Appends input source file name to beginning of each tuple.");
+ Option overwrite = OptionBuilder.hasOptionalArgs(1).withArgName("overwrite").withLongOpt("overwrite").withDescription("Overwrites the destination.").create();
+ validOptions.addOption(overwrite);
}
public PigStorage() {
@@ -200,6 +210,12 @@ LoadPushDown, LoadMetadata, StoreMetadat
try {
configuredOptions = parser.parse(validOptions, optsArr);
isSchemaOn = configuredOptions.hasOption("schema");
+ if (configuredOptions.hasOption("overwrite")) {
+ String value = configuredOptions.getOptionValue("overwrite");
+ if ("true".equalsIgnoreCase(value)) {
+ overwriteOutput = true;
+ }
+ }
dontLoadSchema = configuredOptions.hasOption("noschema");
tagFile = configuredOptions.hasOption(TAG_SOURCE_FILE);
tagPath = configuredOptions.hasOption(TAG_SOURCE_PATH);
@@ -307,8 +323,12 @@ LoadPushDown, LoadMetadata, StoreMetadat
// only contains required fields.
// We walk the requiredColumns array to find required fields,
// and cast those.
- for (int i = 0; i < Math.min(fieldSchemas.length, tup.size()); i++) {
+ for (int i = 0; i < fieldSchemas.length; i++) {
if (mRequiredColumns == null || (mRequiredColumns.length>i && mRequiredColumns[i])) {
+ if (tupleIdx >= tup.size()) {
+ tup.append(null);
+ }
+
Object val = null;
if(tup.get(tupleIdx) != null){
byte[] bytes = ((DataByteArray) tup.get(tupleIdx)).get();
@@ -319,9 +339,6 @@ LoadPushDown, LoadMetadata, StoreMetadat
tupleIdx++;
}
}
- for (int i = tup.size(); i < fieldSchemas.length; i++) {
- tup.append(null);
- }
}
return tup;
}
@@ -567,4 +584,24 @@ LoadPushDown, LoadMetadata, StoreMetadat
Job job) throws IOException {
}
+
+ @Override
+ public boolean shouldOverwrite() {
+ return this.overwriteOutput;
+ }
+
+ @Override
+ public void cleanupOutput(POStore store, Job job) throws IOException {
+ Configuration conf = job.getConfiguration();
+ String output = conf.get("mapred.output.dir");
+ Path outputPath = null;
+ if (output != null)
+ outputPath = new Path(output);
+ FileSystem fs = outputPath.getFileSystem(conf);
+ try {
+ fs.delete(outputPath, true);
+ } catch (Exception e) {
+ mLog.warn("Could not delete output " + output);
+ }
+ }
}
Modified: pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/branches/tez/src/org/apache/pig/data/SelfSpillBag.java Mon Feb 24 21:41:38 2014
@@ -56,6 +56,19 @@ public abstract class SelfSpillBag exten
private int cacheLimit = Integer.MAX_VALUE;
private long memUsage = 0;
private long numObjsSizeChecked = 0;
+
+ private static float cachedMemUsage = 0.2F;
+ private static long maxMem = 0;
+ static {
+ maxMem = Runtime.getRuntime().maxMemory();
+ if (PigMapReduce.sJobConfInternal.get() != null) {
+ String usage = PigMapReduce.sJobConfInternal.get().get(
+ PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
+ if (usage != null) {
+ cachedMemUsage = Float.parseFloat(usage);
+ }
+ }
+ }
/**
* @param bagCount
@@ -68,18 +81,10 @@ public abstract class SelfSpillBag exten
private void init(int bagCount, float percent) {
if (percent < 0) {
- percent = 0.2F;
- if (PigMapReduce.sJobConfInternal.get() != null) {
- String usage = PigMapReduce.sJobConfInternal.get().get(
- PigConfiguration.PROP_CACHEDBAG_MEMUSAGE);
- if (usage != null) {
- percent = Float.parseFloat(usage);
- }
- }
+ percent = cachedMemUsage;
}
- long max = Runtime.getRuntime().maxMemory();
- maxMemUsage = (long) ((max * percent) / bagCount);
+ maxMemUsage = (long) ((maxMem * percent) / bagCount);
// set limit to 0, if memusage is 0 or really really small.
// then all tuples are put into disk
Modified: pig/branches/tez/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/PigContext.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/PigContext.java Mon Feb 24 21:41:38 2014
@@ -23,9 +23,9 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
+import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-import java.io.IOException;
import java.io.Serializable;
import java.io.StringWriter;
import java.lang.reflect.Constructor;
@@ -118,6 +118,9 @@ public class PigContext implements Seria
// (some functions may come from pig.jar and we don't want the whole jar file.)
transient public Vector<String> skipJars = new Vector<String>(2);
+ // jars that are predeployed to the cluster and thus should not be merged in at all (even subsets).
+ transient public Vector<String> predeployedJars = new Vector<String>(2);
+
// script files that are needed to run a job
@Deprecated
public List<String> scriptFiles = new ArrayList<String>();
@@ -145,6 +148,9 @@ public class PigContext implements Seria
private static ThreadLocal<ArrayList<String>> packageImportList =
new ThreadLocal<ArrayList<String>>();
+ private static ThreadLocal<Map<String,Class<?>>> classCache =
+ new ThreadLocal<Map<String,Class<?>>>();
+
private Properties log4jProperties = new Properties();
private Level defaultLogLevel = Level.INFO;
@@ -355,6 +361,17 @@ public class PigContext implements Seria
Thread.currentThread().setContextClassLoader(PigContext.classloader);
}
}
+
+ /**
+ * Adds the specified path to the predeployed jars list. These jars will
+ * never be included in generated job jar.
+ * <p>
+ * This can be called for jars that are pre-installed on the Hadoop
+ * cluster to reduce the size of the job jar.
+ */
+ public void markJarAsPredeployed(String path) {
+ predeployedJars.add(path);
+ }
public String doParamSubstitution(InputStream in,
List<String> params,
@@ -606,12 +623,30 @@ public class PigContext implements Seria
return new ContextClassLoader(urls, PigContext.class.getClassLoader());
}
+ private static Map<String,Class<?>> getClassCache() {
+ Map<String,Class<?>> c = classCache.get();
+ if (c == null) {
+ c = new HashMap<String,Class<?>>();
+ classCache.set(c);
+ }
+
+ return c;
+ }
+
@SuppressWarnings("rawtypes")
public static Class resolveClassName(String name) throws IOException{
+ Map<String,Class<?>> cache = getClassCache();
+
+ Class c = cache.get(name);
+ if (c != null) {
+ return c;
+ }
+
for(String prefix: getPackageImportList()) {
- Class c;
try {
c = Class.forName(prefix+name,true, PigContext.classloader);
+ cache.put(name, c);
+
return c;
}
catch (ClassNotFoundException e) {
Modified: pig/branches/tez/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/PigImplConstants.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/PigImplConstants.java Mon Feb 24 21:41:38 2014
@@ -36,4 +36,9 @@ public class PigImplConstants {
* the set of disabled optimizer rules.
*/
public static final String PIG_OPTIMIZER_RULES_KEY = "pig.optimizer.rules";
+
+ /**
+ * Used by pig to indicate that current job has been converted to run in local mode
+ */
+ public static final String CONVERTED_TO_LOCAL = "pig.job.converted.local";
}
Modified: pig/branches/tez/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/io/FileLocalizer.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/io/FileLocalizer.java Mon Feb 24 21:41:38 2014
@@ -39,8 +39,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Shell;
import org.apache.pig.ExecType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
@@ -51,6 +54,7 @@ import org.apache.pig.backend.datastorag
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
+import org.apache.pig.backend.hadoop.datastorage.HDirectory;
import org.apache.pig.backend.hadoop.datastorage.HPath;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.impl.PigContext;
@@ -64,11 +68,14 @@ public class FileLocalizer {
static public final int STYLE_UNIX = 0;
static public final int STYLE_WINDOWS = 1;
+ public static FsPermission OWNER_ONLY_PERMS = new FsPermission(FsAction.ALL, FsAction.NONE,
+ FsAction.NONE); // rwx------
+
public static class DataStorageInputStreamIterator extends InputStream {
InputStream current;
ElementDescriptor[] elements;
int currentElement;
-
+
public DataStorageInputStreamIterator(ElementDescriptor[] elements) {
this.elements = elements;
}
@@ -164,7 +171,7 @@ public class FileLocalizer {
throw new RuntimeException(
"can't open DFS file while executing locally");
}
-
+
return openDFSFile(fileName, ConfigurationUtil.toProperties(conf));
}
@@ -174,7 +181,7 @@ public class FileLocalizer {
ElementDescriptor elem = dds.asElement(fileName);
return openDFSFile(elem);
}
-
+
public static long getSize(String fileName) throws IOException {
Configuration conf = PigMapReduce.sJobConfInternal.get();
@@ -185,28 +192,28 @@ public class FileLocalizer {
return getSize(fileName, ConfigurationUtil.toProperties(conf));
}
-
+
public static long getSize(String fileName, Properties properties) throws IOException {
DataStorage dds = new HDataStorage(properties);
ElementDescriptor elem = dds.asElement(fileName);
-
+
// recursively get all the files under this path
ElementDescriptor[] allElems = getFileElementDescriptors(elem);
-
+
long size = 0;
-
+
// add up the sizes of all files found
for (int i=0; i<allElems.length; i++) {
Map<String, Object> stats = allElems[i].getStatistics();
size += (Long) (stats.get(ElementDescriptor.LENGTH_KEY));
}
-
+
return size;
}
-
+
private static InputStream openDFSFile(ElementDescriptor elem) throws IOException{
ElementDescriptor[] elements = null;
-
+
if (elem.exists()) {
try {
if(! elem.getDataStorage().isContainer(elem.toString())) {
@@ -218,7 +225,7 @@ public class FileLocalizer {
catch (DataStorageException e) {
throw new IOException("Failed to determine if elem=" + elem + " is container", e);
}
-
+
// elem is a directory - recursively get all files in it
elements = getFileElementDescriptors(elem);
} else {
@@ -226,15 +233,15 @@ public class FileLocalizer {
if (!globMatchesFiles(elem, elem.getDataStorage())) {
throw new IOException(elem.toString() + " does not exist");
} else {
- elements = getFileElementDescriptors(elem);
+ elements = getFileElementDescriptors(elem);
return new DataStorageInputStreamIterator(elements);
-
+
}
}
-
+
return new DataStorageInputStreamIterator(elements);
}
-
+
/**
* recursively get all "File" element descriptors present in the input element descriptor
* @param elem input element descriptor
@@ -259,7 +266,7 @@ public class FileLocalizer {
if (fullPath.systemElement()) {
continue;
}
-
+
if (fullPath instanceof ContainerDescriptor) {
for (ElementDescriptor child : ((ContainerDescriptor) fullPath)) {
paths.add(child);
@@ -274,7 +281,7 @@ public class FileLocalizer {
filePaths.toArray(elems);
return elems;
}
-
+
private static InputStream openLFSFile(ElementDescriptor elem) throws IOException{
// IMPORTANT NOTE: Currently we use HXXX classes to represent
// files and dirs in local mode - so we can just delegate this
@@ -282,7 +289,7 @@ public class FileLocalizer {
// and dirs THIS WILL NEED TO CHANGE
return openDFSFile(elem);
}
-
+
/**
* This function returns an input stream to a local file system file or
* a file residing on Hadoop's DFS
@@ -306,7 +313,7 @@ public class FileLocalizer {
return openLFSFile(elem);
}
}
-
+
/**
* @deprecated Use {@link #fullPath(String, PigContext)} instead
*/
@@ -317,7 +324,7 @@ public class FileLocalizer {
if (fileName.charAt(0) != '/') {
ElementDescriptor currentDir = storage.getActiveContainer();
ElementDescriptor elem = storage.asElement(currentDir.toString(), fileName);
-
+
fullPath = elem.toString();
} else {
fullPath = fileName;
@@ -327,7 +334,7 @@ public class FileLocalizer {
}
return fullPath;
}
-
+
static public InputStream open(String fileSpec, PigContext pigContext) throws IOException {
fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
if (!fileSpec.startsWith(LOCAL_PREFIX)) {
@@ -342,35 +349,35 @@ public class FileLocalizer {
return openLFSFile(elem);
}
}
-
+
/**
* @param fileSpec
* @param offset
* @param pigContext
* @return SeekableInputStream
* @throws IOException
- *
+ *
* This is an overloaded version of open where there is a need to seek in stream. Currently seek is supported
* only in file, not in directory or glob.
*/
static public SeekableInputStream open(String fileSpec, long offset, PigContext pigContext) throws IOException {
-
+
fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
-
+
ElementDescriptor elem;
- if (!fileSpec.startsWith(LOCAL_PREFIX))
+ if (!fileSpec.startsWith(LOCAL_PREFIX))
elem = pigContext.getDfs().asElement(fullPath(fileSpec, pigContext));
-
+
else{
fileSpec = fileSpec.substring(LOCAL_PREFIX.length());
- elem = pigContext.getLfs().asElement(fullPath(fileSpec, pigContext));
+ elem = pigContext.getLfs().asElement(fullPath(fileSpec, pigContext));
}
-
+
if (elem.exists() && (!elem.getDataStorage().isContainer(elem.toString()))) {
try {
if (elem.systemElement())
throw new IOException ("Attempt is made to open system file " + elem.toString());
-
+
SeekableInputStream sis = elem.sopen();
sis.seek(offset, FLAGS.SEEK_SET);
return sis;
@@ -383,7 +390,7 @@ public class FileLocalizer {
else
throw new IOException("Currently seek is supported only in a file, not in glob or directory.");
}
-
+
static public OutputStream create(String fileSpec, PigContext pigContext) throws IOException{
return create(fileSpec,false,pigContext);
}
@@ -404,11 +411,11 @@ public class FileLocalizer {
if (!res)
log.warn("FileLocalizer.create: failed to create " + f);
}
-
+
return new FileOutputStream(fileSpec,append);
}
}
-
+
static public boolean delete(String fileSpec, PigContext pigContext) throws IOException{
fileSpec = checkDefaultPrefix(pigContext.getExecType(), fileSpec);
ElementDescriptor elem = null;
@@ -455,13 +462,28 @@ public class FileLocalizer {
throws DataStorageException {
if (relativeRoot.get() == null) {
- String tdir= pigContext.getProperties().getProperty("pig.temp.dir", "/tmp");
- relativeRoot.set(pigContext.getDfs().asContainer(tdir + "/temp" + r.nextInt()));
+ String tdir= pigContext.getProperties().getProperty(PigConfiguration.PIG_TEMP_DIR, "/tmp");
+ ContainerDescriptor relative = pigContext.getDfs().asContainer(tdir + "/temp" + r.nextInt());
+ relativeRoot.set(relative);
+ try {
+ if (!relative.exists()) {
+ createRelativeRoot(relative);
+ }
+ } catch (IOException e) {
+ throw new DataStorageException(e);
+ }
}
return relativeRoot.get();
}
+ private static void createRelativeRoot(ContainerDescriptor relativeRoot) throws IOException {
+ relativeRoot.create();
+ if (relativeRoot instanceof HDirectory) {
+ ((HDirectory) relativeRoot).setPermission(OWNER_ONLY_PERMS);
+ }
+ }
+
public static void deleteTempFiles() {
if (relativeRoot.get() != null) {
try {
@@ -480,9 +502,6 @@ public class FileLocalizer {
public static Path getTemporaryPath(PigContext pigContext, String suffix) throws IOException {
ElementDescriptor relative = relativeRoot(pigContext);
- if (!relativeRoot(pigContext).exists()) {
- relativeRoot(pigContext).create();
- }
ElementDescriptor elem=
pigContext.getDfs().asElement(relative.toString(), "tmp" + r.nextInt() + suffix);
return ((HPath)elem).getPath();
@@ -495,30 +514,30 @@ public class FileLocalizer {
if (filename.startsWith(LOCAL_PREFIX)) {
filename = filename.substring(LOCAL_PREFIX.length());
}
-
+
ElementDescriptor localElem =
pigContext.getLfs().asElement(filename);
-
+
if (!localElem.exists()) {
throw new FileNotFoundException(filename);
}
-
+
ElementDescriptor distribElem = pigContext.getDfs().asElement(
getTemporaryPath(pigContext).toString());
-
+
int suffixStart = filename.lastIndexOf('.');
if (suffixStart != -1) {
distribElem = pigContext.getDfs().asElement(distribElem.toString() +
filename.substring(suffixStart));
}
-
+
// TODO: currently the copy method in Data Storage does not allow to specify overwrite
// so the work around is to delete the dst file first, if it exists
if (distribElem.exists()) {
distribElem.delete();
}
localElem.copy(distribElem, null, false);
-
+
return distribElem.toString();
}
@@ -528,7 +547,7 @@ public class FileLocalizer {
ElementDescriptor currentDir = pigContext.getDfs().getActiveContainer();
ElementDescriptor elem = pigContext.getDfs().asElement(currentDir.toString(),
filename);
-
+
return elem.toString();
}
return filename;
@@ -546,7 +565,7 @@ public class FileLocalizer {
/**
* @deprecated Use {@link #fileExists(String, PigContext)} instead
*/
- @Deprecated
+ @Deprecated
public static boolean fileExists(String filename, DataStorage store)
throws IOException {
ElementDescriptor elem = store.asElement(filename);
@@ -559,7 +578,7 @@ public class FileLocalizer {
}
/**
- * @deprecated Use {@link #isFile(String, PigContext)} instead
+ * @deprecated Use {@link #isFile(String, PigContext)} instead
*/
@Deprecated
public static boolean isFile(String filename, DataStorage store)
@@ -593,10 +612,10 @@ public class FileLocalizer {
switch (elems.length) {
case 0:
return false;
-
+
case 1:
return !elems[0].equals(elem);
-
+
default:
return true;
}
@@ -614,13 +633,13 @@ public class FileLocalizer {
/**
* Convert path from Windows convention to Unix convention. Invoked under
* cygwin.
- *
+ *
* @param path
* path in Windows convention
* @return path in Unix convention, null if fail
*/
static public String parseCygPath(String path, int style) {
- String[] command;
+ String[] command;
if (style==STYLE_WINDOWS)
command = new String[] { "cygpath", "-w", path };
else
@@ -653,7 +672,7 @@ public class FileLocalizer {
}
return line;
}
-
+
static File localTempDir = null;
static {
File f;
@@ -669,8 +688,8 @@ public class FileLocalizer {
if (!success) {
throw new RuntimeException("Error creating FileLocalizer temp directory.");
}
- }
-
+ }
+
public static class FetchFileRet {
public FetchFileRet(File file, boolean didFetch) {
this.file = file;
@@ -681,9 +700,9 @@ public class FileLocalizer {
}
/**
- * Ensures that the passed path is on the local file system, fetching it
+ * Ensures that the passed path is on the local file system, fetching it
* to the java.io.tmpdir if necessary. If pig.jars.relative.to.dfs is true
- * and dfs is not null, then a relative path is assumed to be relative to the passed
+ * and dfs is not null, then a relative path is assumed to be relative to the passed
* dfs active directory. Else they are assumed to be relative to the local working
* directory.
*/
@@ -776,21 +795,21 @@ public class FileLocalizer {
return fetchFiles;
}
-
+
/**
* Ensures that the passed resource is available from the local file system, fetching
* it to a temporary directory.
- *
- * @throws ResourceNotFoundException
+ *
+ * @throws ResourceNotFoundException
*/
public static FetchFileRet fetchResource(String name) throws IOException, ResourceNotFoundException {
FetchFileRet localFileRet = null;
InputStream resourceStream = PigContext.getClassLoader().getResourceAsStream(name);
- if (resourceStream != null) {
+ if (resourceStream != null) {
File dest = new File(localTempDir, name);
- dest.getParentFile().mkdirs();
+ dest.getParentFile().mkdirs();
dest.deleteOnExit();
-
+
OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(dest));
byte[] buffer = new byte[1024];
int len;
@@ -798,14 +817,14 @@ public class FileLocalizer {
outputStream.write(buffer,0,len);
}
outputStream.close();
-
+
localFileRet = new FetchFileRet(dest,false);
}
else
{
throw new ResourceNotFoundException(name);
}
-
+
return localFileRet;
}
}
Modified: pig/branches/tez/src/org/apache/pig/impl/io/SequenceFileInterStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/io/SequenceFileInterStorage.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/io/SequenceFileInterStorage.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/io/SequenceFileInterStorage.java Mon Feb 24 21:41:38 2014
@@ -205,6 +205,8 @@ StoreFuncInterface, LoadMetadata {
@Override
public void setStoreLocation(String location, Job job) throws IOException {
+ Configuration conf = job.getConfiguration();
+ Utils.setMapredCompressionCodecProps(conf);
FileOutputFormat.setOutputPath(job, new Path(location));
}
Modified: pig/branches/tez/src/org/apache/pig/impl/plan/CompilationMessageCollector.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/plan/CompilationMessageCollector.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/plan/CompilationMessageCollector.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/plan/CompilationMessageCollector.java Mon Feb 24 21:41:38 2014
@@ -26,11 +26,11 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.pig.PigWarning;
/***
- * This class is used for collecting all messages (error + warning) in
- * compilation process. These messages are reported back to users
+ * This class is used for collecting all messages (error + warning) in
+ * compilation process. These messages are reported back to users
* at the end of compilation.
- *
- * iterator() has to be called after CompilationMessageCollector is fully
+ *
+ * iterator() has to be called after CompilationMessageCollector is fully
* populated otherwise the state is undefined.
*/
public class CompilationMessageCollector implements Iterable<CompilationMessageCollector.Message> {
@@ -41,49 +41,57 @@ public class CompilationMessageCollector
Warning,
Info
}
-
+
+ public enum Unknown {
+ UNKNOWN_MESSAGE_KIND;
+ public String toString() {
+ return "Aggregated unknown kind messages. Please set -Daggregate.warning=false to retrieve these messages";
+ }
+ }
+
public static class Message {
private String msg = null ;
private MessageType msgType = MessageType.Unknown ;
private Enum kind = null;
-
+
public Message(String message, MessageType messageType) {
msg = message ;
msgType = messageType ;
}
-
+
public Message(String message, MessageType messageType, Enum kind) {
- this(message, messageType);
- this.kind = kind;
+ this(message, messageType);
+ this.kind = kind;
}
-
+
public String getMessage() {
return msg ;
}
-
+
public MessageType getMessageType() {
return msgType ;
}
-
+
public Enum getKind() {
- return kind;
+ return kind;
}
}
-
+
private List<Message> messageList = new ArrayList<Message>() ;
-
+
public CompilationMessageCollector() {
// nothing here
}
-
+
public void collect(String message, MessageType messageType) {
- messageList.add(new Message(message, messageType)) ;
+ messageList.add(new Message(message, messageType,
+ Unknown.UNKNOWN_MESSAGE_KIND)) ;
}
-
+
public void collect(String message, MessageType messageType, Enum kind) {
messageList.add(new Message(message, messageType, kind)) ;
}
-
+
protected boolean hasMessageType(MessageType messageType) {
Iterator<Message> iter = iterator() ;
while(iter.hasNext()) {
@@ -93,48 +101,48 @@ public class CompilationMessageCollector
}
return false ;
}
-
+
public boolean hasError() {
- return hasMessageType(MessageType.Error);
+ return hasMessageType(MessageType.Error);
}
public Iterator<Message> iterator() {
return messageList.iterator() ;
}
-
+
public boolean hasMessage() {
return messageList.size() > 0 ;
}
-
+
public int size() {
return messageList.size() ;
}
-
+
public Message get(int i) {
return messageList.get(i) ;
}
-
+
public Map<Enum, Long> getKindAggregate(MessageType messageType) {
- Map<Enum, Long> aggMap = new HashMap<Enum, Long>();
+ Map<Enum, Long> aggMap = new HashMap<Enum, Long>();
Iterator<Message> iter = iterator() ;
while(iter.hasNext()) {
- Message message = iter.next();
+ Message message = iter.next();
if (message.getMessageType() == messageType) {
- Enum kind = message.getKind();
- if(kind != null) {
- Long count = aggMap.get(kind);
- count = (count == null? 1 : ++count);
- aggMap.put(kind, count);
- }
+ Enum kind = message.getKind();
+ if(kind != null) {
+ Long count = aggMap.get(kind);
+ count = (count == null? 1 : ++count);
+ aggMap.put(kind, count);
+ }
}
- }
- return aggMap;
+ }
+ return aggMap;
}
-
+
public static void logAggregate(Map<Enum, Long> aggMap, MessageType messageType, Log log) {
long nullCounterCount = aggMap.get(PigWarning.NULL_COUNTER_COUNT)==null?0 : aggMap.get(PigWarning.NULL_COUNTER_COUNT);
if (nullCounterCount!=0 && aggMap.size()>1) // PigWarning.NULL_COUNTER_COUNT is definitely in appMap
- logMessage("Unable to retrieve hadoop counter for " + nullCounterCount +
+ logMessage("Unable to retrieve hadoop counter for " + nullCounterCount +
" jobs, the number following warnings may not be correct", messageType, log);
for(Map.Entry<Enum, Long> e: aggMap.entrySet()) {
if (e.getKey() !=PigWarning.NULL_COUNTER_COUNT)
@@ -145,52 +153,52 @@ public class CompilationMessageCollector
logMessage(message, messageType, log);
}
}
- }
+ }
}
-
- public static void logMessages(CompilationMessageCollector messageCollector,
- MessageType messageType, boolean aggregate, Log log) {
- if(aggregate) {
- Map<Enum, Long> aggMap = messageCollector.getKindAggregate(messageType);
- logAggregate(aggMap, messageType, log);
- } else {
- Iterator<Message> messageIter = messageCollector.iterator();
- while(messageIter.hasNext()) {
- Message message = messageIter.next();
- if(message.getMessageType() == messageType) {
- logMessage(message.getMessage(), messageType, log);
- }
- }
- }
+
+ public static void logMessages(CompilationMessageCollector messageCollector,
+ MessageType messageType, boolean aggregate, Log log) {
+ if(aggregate) {
+ Map<Enum, Long> aggMap = messageCollector.getKindAggregate(messageType);
+ logAggregate(aggMap, messageType, log);
+ } else {
+ Iterator<Message> messageIter = messageCollector.iterator();
+ while(messageIter.hasNext()) {
+ Message message = messageIter.next();
+ if(message.getMessageType() == messageType) {
+ logMessage(message.getMessage(), messageType, log);
+ }
+ }
+ }
}
-
+
public void logMessages(MessageType messageType, boolean aggregate, Log log) {
- logMessages(this, messageType, aggregate, log);
+ logMessages(this, messageType, aggregate, log);
}
-
+
public static void logAllMessages(CompilationMessageCollector messageCollector, Log log) {
- Iterator<Message> messageIter = messageCollector.iterator();
- while(messageIter.hasNext()) {
- Message message = messageIter.next();
- logMessage(message.getMessage(), message.getMessageType(), log);
- }
+ Iterator<Message> messageIter = messageCollector.iterator();
+ while(messageIter.hasNext()) {
+ Message message = messageIter.next();
+ logMessage(message.getMessage(), message.getMessageType(), log);
+ }
}
-
+
public void logAllMessages(Log log) {
- logAllMessages(this, log);
+ logAllMessages(this, log);
}
private static void logMessage(String messageString, MessageType messageType, Log log) {
- switch(messageType) {
- case Info:
- log.info(messageString);
- break;
- case Warning:
- log.warn(messageString);
- break;
- case Error:
- log.error(messageString);
- }
+ switch(messageType) {
+ case Info:
+ log.info(messageString);
+ break;
+ case Warning:
+ log.warn(messageString);
+ break;
+ case Error:
+ log.error(messageString);
+ }
}
-
+
}
Modified: pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/JarManager.java Mon Feb 24 21:41:38 2014
@@ -343,6 +343,8 @@ public class JarManager {
*/
private static void addContainingJar(Vector<JarListEntry> jarList, Class clazz, String prefix, PigContext pigContext) {
String jar = findContainingJar(clazz);
+ if (pigContext.predeployedJars.contains(jar))
+ return;
if (pigContext.skipJars.contains(jar) && prefix == null)
return;
if (jar == null)
Modified: pig/branches/tez/src/org/apache/pig/impl/util/ObjectSerializer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/ObjectSerializer.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/ObjectSerializer.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/ObjectSerializer.java Mon Feb 24 21:41:38 2014
@@ -29,6 +29,8 @@ import java.util.zip.DeflaterOutputStrea
import java.util.zip.InflaterInputStream;
import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.input.ClassLoaderObjectInputStream;
public class ObjectSerializer {
@@ -51,12 +53,15 @@ public class ObjectSerializer {
public static Object deserialize(String str) throws IOException {
if (str == null || str.length() == 0)
return null;
+ ObjectInputStream objStream = null;
try {
ByteArrayInputStream serialObj = new ByteArrayInputStream(decodeBytes(str));
- ObjectInputStream objStream = new ObjectInputStream(new InflaterInputStream(serialObj));
+ objStream = new ClassLoaderObjectInputStream(Thread.currentThread().getContextClassLoader(), new InflaterInputStream(serialObj));
return objStream.readObject();
} catch (Exception e) {
throw new IOException("Deserialization error: " + e.getMessage(), e);
+ } finally {
+ IOUtils.closeQuietly(objStream);
}
}
Modified: pig/branches/tez/src/org/apache/pig/impl/util/PropertiesUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/PropertiesUtil.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/PropertiesUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/PropertiesUtil.java Mon Feb 24 21:41:38 2014
@@ -26,6 +26,7 @@ import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
public class PropertiesUtil {
private static final String DEFAULT_PROPERTIES_FILE = "/pig-default.properties";
@@ -143,6 +144,11 @@ public class PropertiesUtil {
//by default we keep going on error on the backend
properties.setProperty("stop.on.failure", ""+false);
}
+
+ if (properties.getProperty(PigConfiguration.OPT_FETCH) == null) {
+ //by default fetch optimization is on
+ properties.setProperty(PigConfiguration.OPT_FETCH, ""+true);
+ }
}
/**
Modified: pig/branches/tez/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/impl/util/Utils.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/tez/src/org/apache/pig/impl/util/Utils.java Mon Feb 24 21:41:38 2014
@@ -26,9 +26,6 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.SequenceInputStream;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketImplFactory;
import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
@@ -340,7 +337,7 @@ public class Utils {
}
public static FileInputLoadFunc getTmpFileStorageObject(Configuration conf) throws IOException {
- Class<? extends FileInputLoadFunc> storageClass = getTmpFileStorage(ConfigurationUtil.toProperties(conf)).getStorageClass();
+ Class<? extends FileInputLoadFunc> storageClass = getTmpFileStorageClass(ConfigurationUtil.toProperties(conf));
try {
return storageClass.newInstance();
} catch (InstantiationException e) {
@@ -350,6 +347,10 @@ public class Utils {
}
}
+ public static Class<? extends FileInputLoadFunc> getTmpFileStorageClass(Properties properties) {
+ return getTmpFileStorage(properties).getStorageClass();
+ }
+
private static TEMPFILE_STORAGE getTmpFileStorage(Properties properties) {
boolean tmpFileCompression = properties.getProperty(
PigConfiguration.PIG_ENABLE_TEMP_FILE_COMPRESSION, "false").equals("true");
@@ -369,7 +370,20 @@ public class Utils {
}
}
+ public static void setMapredCompressionCodecProps(Configuration conf) {
+ String codec = conf.get(
+ PigConfiguration.PIG_TEMP_FILE_COMPRESSION_CODEC, "");
+ if ("".equals(codec) && conf.get("mapred.output.compression.codec") != null) {
+ conf.setBoolean("mapred.output.compress", true);
+ } else if(TEMPFILE_STORAGE.SEQFILE.ensureCodecSupported(codec)) {
+ conf.setBoolean("mapred.output.compress", true);
+ conf.set("mapred.output.compression.codec", TEMPFILE_CODEC.valueOf(codec.toUpperCase()).getHadoopCodecClassName());
+ }
+ // no codec specified
+ }
+
public static void setTmpFileCompressionOnConf(PigContext pigContext, Configuration conf) throws IOException{
+ // PIG-3741 This is also called for non-intermediate jobs, do not set any mapred properties here
if (pigContext == null) {
return;
}
@@ -380,7 +394,6 @@ public class Utils {
case INTER:
break;
case SEQFILE:
- conf.setBoolean("mapred.output.compress", true);
conf.set(PigConfiguration.PIG_TEMP_FILE_COMPRESSION_STORAGE, "seqfile");
if("".equals(codec)) {
// codec is not specified, ensure is set
@@ -389,7 +402,7 @@ public class Utils {
throw new IOException("mapred.output.compression.codec is not set");
}
} else if(storage.ensureCodecSupported(codec)) {
- conf.set("mapred.output.compression.codec", TEMPFILE_CODEC.valueOf(codec.toUpperCase()).getHadoopCodecClassName());
+ // do nothing
} else {
throw new IOException("Invalid temporary file compression codec [" + codec + "]. " +
"Expected compression codecs for " + storage.getStorageClass().getName() + " are " + storage.supportedCodecsToString() + ".");
Modified: pig/branches/tez/src/org/apache/pig/newplan/FilterExtractor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/FilterExtractor.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/FilterExtractor.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/FilterExtractor.java Mon Feb 24 21:41:38 2014
@@ -180,23 +180,35 @@ public class FilterExtractor {
}
private LogicalExpression andLogicalExpressions(
- LogicalExpressionPlan plan, LogicalExpression a, LogicalExpression b) {
+ LogicalExpressionPlan plan, LogicalExpression a, LogicalExpression b) throws FrontendException {
if (a == null) {
return b;
}
if (b == null) {
return a;
}
+ if (!plan.ops.contains(a)) {
+ a = a.deepCopy(plan);
+ }
+ if (!plan.ops.contains(b)) {
+ b = b.deepCopy(plan);
+ }
LogicalExpression andOp = new AndExpression(plan, a, b);
return andOp;
}
private LogicalExpression orLogicalExpressions(
- LogicalExpressionPlan plan, LogicalExpression a, LogicalExpression b) {
+ LogicalExpressionPlan plan, LogicalExpression a, LogicalExpression b) throws FrontendException {
// Or 2 operators if they are not null
if (a == null || b == null) {
return null;
}
+ if (!plan.ops.contains(a)) {
+ a = a.deepCopy(plan);
+ }
+ if (!plan.ops.contains(b)) {
+ b = b.deepCopy(plan);
+ }
LogicalExpression orOp = new OrExpression(plan, a, b);
return orOp;
}
@@ -234,7 +246,7 @@ public class FilterExtractor {
// AND (leftState.filterExpr OR rightState.pushdownExpr)
// AND (leftState.filterExpr OR rightState.filterExpr)
state.pushdownExpr = orLogicalExpressions(pushdownExprPlan, leftState.pushdownExpr, rightState.pushdownExpr);
- if(state.pushdownExpr == null) {
+ if (state.pushdownExpr == null) {
// Whatever we did so far on the right tree is all wasted :(
// Undo all the mutation (AND OR distributions) until now
removeFromFilteredPlan(leftState.filterExpr);
Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Mon Feb 24 21:41:38 2014
@@ -506,6 +506,10 @@ public class ExpToPhyTranslationVisitor
.getNextNodeId(DEFAULT_SCOPE)), -1,
null, op.getFuncSpec(), (EvalFunc) f);
((POUserFunc)p).setSignature(op.getSignature());
+ //reinitialize input schema from signature
+ if (((POUserFunc)p).getFunc().getInputSchema() == null) {
+ ((POUserFunc)p).setFuncInputSchema(op.getSignature());
+ }
List<String> cacheFiles = ((EvalFunc)f).getCacheFiles();
if (cacheFiles != null) {
((POUserFunc)p).setCacheFiles(cacheFiles.toArray(new String[cacheFiles.size()]));
Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOForEach.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOForEach.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOForEach.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOForEach.java Mon Feb 24 21:41:38 2014
@@ -77,44 +77,25 @@ public class LOForEach extends LogicalRe
}
// Find the LOInnerLoad of the inner plan corresponding to the project, and
- // also find whether there is a LOForEach in inner plan along the way
+ // also find whether there is a relational operator in inner plan along the way
public static Pair<List<LOInnerLoad>, Boolean> findReacheableInnerLoadFromBoundaryProject(ProjectExpression project) throws FrontendException {
boolean needNewUid = false;
- LogicalRelationalOperator referred = project.findReferent();
- // If it is nested foreach, generate new uid
- if (referred instanceof LOForEach)
- needNewUid = true;
- List<Operator> srcs = referred.getPlan().getSources();
List<LOInnerLoad> innerLoads = new ArrayList<LOInnerLoad>();
- for (Operator src:srcs) {
- if (src instanceof LOInnerLoad) {
- if( src == referred ) {
- innerLoads.add( (LOInnerLoad)src );
- continue;
- }
-
- Deque<Operator> stack = new LinkedList<Operator>();
- List<Operator> succs = referred.getPlan().getSuccessors( src );
- if( succs != null ) {
- for( Operator succ : succs ) {
- stack.push( succ );
- }
- }
-
- while( !stack.isEmpty() ) {
- Operator op = stack.pop();
- if( op == referred ) {
- innerLoads.add((LOInnerLoad)src);
- break;
- }
- else {
- List<Operator> ops = referred.getPlan().getSuccessors( op );
- if( ops != null ) {
- for( Operator o : ops ) {
- stack.push( o );
- }
- }
- }
+ LogicalRelationalOperator referred = project.findReferent();
+ Deque<Operator> stack = new LinkedList<Operator>();
+ stack.add(referred);
+ while( !stack.isEmpty() ) {
+ Operator op = stack.pop();
+ if (op instanceof LOInnerLoad) {
+ innerLoads.add((LOInnerLoad)op);
+ }
+ else if (!(op instanceof LOGenerate)) {
+ needNewUid = true;
+ }
+ List<Operator> ops = referred.getPlan().getPredecessors( op );
+ if( ops != null ) {
+ for( Operator o : ops ) {
+ stack.push( o );
}
}
}
Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Mon Feb 24 21:41:38 2014
@@ -144,11 +144,12 @@ public class LOGenerate extends LogicalR
fs.stampFieldSchema();
mergedSchema.addField(new LogicalFieldSchema(fs));
}
- if(mergedSchema.size() == 1 && mergedSchema.getField(0).type == DataType.NULL){
- //this is the use case where a new alias has been specified by user
- mergedSchema.getField(0).type = DataType.BYTEARRAY;
+ for (LogicalFieldSchema fs : mergedSchema.getFields()) {
+ if (fs.type == DataType.NULL){
+ //this is the use case where a new alias has been specified by user
+ fs.type = DataType.BYTEARRAY;
+ }
}
-
} else {
// Merge uid with the exp field schema
Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOSort.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOSort.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOSort.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/relational/LOSort.java Mon Feb 24 21:41:38 2014
@@ -31,8 +31,8 @@ import org.apache.pig.newplan.OperatorPl
import org.apache.pig.newplan.PlanVisitor;
import org.apache.pig.newplan.ReverseDependencyOrderWalker;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
-import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.visitor.ResetProjectionAttachedRelationalOpVisitor;
public class LOSort extends LogicalRelationalOperator{
private List<Boolean> mAscCols;
@@ -200,23 +200,6 @@ public class LOSort extends LogicalRelat
return plan.getPredecessors(this).get(0);
}
- private static class ResetProjectionAttachedRelationalOpVisitor
- extends LogicalExpressionVisitor {
- private LogicalRelationalOperator attachedRelationalOp;
-
- ResetProjectionAttachedRelationalOpVisitor (
- LogicalExpressionPlan plan, LogicalRelationalOperator op )
- throws FrontendException {
- super(plan, new ReverseDependencyOrderWalker(plan));
- this.attachedRelationalOp = op;
-
- }
- @Override
- public void visit(ProjectExpression pe) throws FrontendException {
- pe.setAttachedRelationalOp(attachedRelationalOp);
- }
- }
-
public static LOSort createCopy(LOSort sort) throws FrontendException {
LOSort newSort = new LOSort(sort.getPlan(), null,
sort.getAscendingCols(),
Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/rules/InputOutputFileValidator.java Mon Feb 24 21:41:38 2014
@@ -19,7 +19,9 @@ package org.apache.pig.newplan.logical.r
import java.io.IOException;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.OverwritableStoreFunc;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.StoreFuncInterface;
@@ -91,8 +93,22 @@ public class InputOutputFileValidator {
errCode = 4000;
break;
}
- validationErrStr += ioe.getMessage();
- throw new VisitorException(store, validationErrStr, errCode, errSrc, ioe);
+
+ boolean shouldThrowException = true;
+ if (sf instanceof OverwritableStoreFunc) {
+ if (((OverwritableStoreFunc) sf).shouldOverwrite()) {
+ if (ioe instanceof FileAlreadyExistsException
+ || ioe instanceof org.apache.hadoop.fs.FileAlreadyExistsException) {
+ shouldThrowException = false;
+ }
+ }
+ }
+ if (shouldThrowException) {
+ validationErrStr += ioe.getMessage();
+ throw new VisitorException(store, validationErrStr,
+ errCode, errSrc, ioe);
+ }
+
} catch (InterruptedException ie) {
validationErrStr += ie.getMessage();
throw new VisitorException(store, validationErrStr, errCode, pigCtx.getErrorSource(), ie);
Modified: pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java (original)
+++ pig/branches/tez/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java Mon Feb 24 21:41:38 2014
@@ -20,6 +20,7 @@ package org.apache.pig.newplan.logical.v
import java.util.Map;
import org.apache.pig.FuncSpec;
+import org.apache.pig.PigWarning;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.CompilationMessageCollector;
@@ -107,7 +108,8 @@ public class CastLineageSetter extends A
String msg = "Cannot resolve load function to use for casting from " +
DataType.findTypeName(inType) + " to " +
DataType.findTypeName(outType) + ". ";
- msgCollector.collect(msg, MessageType.Warning);
+ msgCollector.collect(msg, MessageType.Warning,
+ PigWarning.NO_LOAD_FUNCTION_FOR_CASTING_BYTEARRAY);
}else {
cast.setFuncSpec(inLoadFunc);
}
Modified: pig/branches/tez/src/org/apache/pig/parser/AliasMasker.g
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/parser/AliasMasker.g?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/parser/AliasMasker.g (original)
+++ pig/branches/tez/src/org/apache/pig/parser/AliasMasker.g Mon Feb 24 21:41:38 2014
@@ -85,11 +85,15 @@ query : ^( QUERY statement* )
statement : general_statement
| split_statement
| realias_statement
+ | assert_statement
;
split_statement : split_clause
;
+assert_statement: assert_clause
+;
+
realias_statement : realias_clause
;
@@ -132,6 +136,7 @@ op_clause : define_clause
| split_clause
| foreach_clause
| cube_clause
+ | assert_clause
;
define_clause
@@ -299,6 +304,13 @@ store_clause
: ^( STORE alias filename func_clause? )
;
+assert_clause
+ : ^( ASSERT alias cond comment? )
+;
+
+comment : QUOTEDSTRING
+;
+
filter_clause
: ^( FILTER rel cond )
;
@@ -695,6 +707,7 @@ eid : rel_str_op
| TOBAG
| TOMAP
| TOTUPLE
+ | ASSERT
;
// relational operator
Modified: pig/branches/tez/src/org/apache/pig/parser/AstPrinter.g
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/parser/AstPrinter.g?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/parser/AstPrinter.g (original)
+++ pig/branches/tez/src/org/apache/pig/parser/AstPrinter.g Mon Feb 24 21:41:38 2014
@@ -57,6 +57,7 @@ statement : general_statement
| split_statement { sb.append(";\n"); }
| import_statement { sb.append(";\n"); }
| register_statement { sb.append(";\n"); }
+ | assert_statement { sb.append(";\n"); }
| realias_statement
;
@@ -76,6 +77,9 @@ register_statement : ^( REGISTER QUOTEDS
} scripting_udf_clause? )
;
+assert_statement : assert_clause
+;
+
scripting_udf_clause : scripting_language_clause scripting_namespace_clause
;
@@ -735,6 +739,7 @@ eid : rel_str_op
| TOTUPLE { sb.append($TOTUPLE.text); }
| IN { sb.append($IN.text); }
| CASE { sb.append($CASE.text); }
+ | ASSERT { sb.append($ASSERT.text); }
;
// relational operator
Modified: pig/branches/tez/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/parser/AstValidator.g?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/branches/tez/src/org/apache/pig/parser/AstValidator.g Mon Feb 24 21:41:38 2014
@@ -117,6 +117,7 @@ statement : general_statement
| split_statement
| realias_statement
| register_statement
+ | assert_statement
;
split_statement : split_clause
@@ -128,6 +129,9 @@ realias_statement : realias_clause
register_statement : ^( REGISTER QUOTEDSTRING (USING IDENTIFIER AS IDENTIFIER)? )
;
+assert_statement : assert_clause
+;
+
general_statement : ^( STATEMENT ( alias { aliases.add( $alias.name ); } )? op_clause parallel_clause? )
;
@@ -724,6 +728,7 @@ eid : rel_str_op
| TOBAG
| TOMAP
| TOTUPLE
+ | ASSERT
;
// relational operator
Modified: pig/branches/tez/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/parser/LogicalPlanBuilder.java Mon Feb 24 21:41:38 2014
@@ -99,6 +99,7 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.rules.OptimizerUtils;
import org.apache.pig.newplan.logical.visitor.ProjStarInUdfExpander;
import org.apache.pig.newplan.logical.visitor.ProjectStarExpander;
+import org.apache.pig.newplan.logical.visitor.ResetProjectionAttachedRelationalOpVisitor;
public class LogicalPlanBuilder {
@@ -296,6 +297,18 @@ public class LogicalPlanBuilder {
}
// using De Morgan's law (!A && !B) == !(A || B)
currentExpr = new NotExpression(splitPlan, currentExpr);
+
+ try {
+ // Going through all the ProjectExpressions that were cloned
+ // and updating the attached operators from its original
+ // LOSplitOutput to to the "otherwise" LOSplitOutput
+ // (PIG-3641)
+ new ResetProjectionAttachedRelationalOpVisitor(splitPlan, op).visit();
+ } catch (FrontendException e) {
+ e.printStackTrace();
+ throw new PlanGenerationFailureException(intStream, loc, e);
+ }
+
op.setFilterPlan(splitPlan);
return buildOp(loc, op, alias, inputAlias, null);
}
Modified: pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/branches/tez/src/org/apache/pig/parser/LogicalPlanGenerator.g Mon Feb 24 21:41:38 2014
@@ -173,6 +173,7 @@ scope {
: general_statement
| split_statement
| realias_statement
+ | assert_statement
| register_statement
;
@@ -182,6 +183,9 @@ split_statement : split_clause
realias_statement : realias_clause
;
+assert_statement : assert_clause
+;
+
register_statement
: ^( REGISTER QUOTEDSTRING (USING IDENTIFIER AS IDENTIFIER)? )
{
@@ -2002,6 +2006,7 @@ eid returns[String id] : rel_str_op { $i
| TOBAG { $id = "TOBAG"; }
| TOMAP { $id = "TOMAP"; }
| TOTUPLE { $id = "TOTUPLE"; }
+ | ASSERT { $id = "ASSERT"; }
;
// relational operator
Modified: pig/branches/tez/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/parser/QueryParser.g?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/branches/tez/src/org/apache/pig/parser/QueryParser.g Mon Feb 24 21:41:38 2014
@@ -224,6 +224,7 @@ statement : SEMI_COLON!
| import_clause SEMI_COLON!
| realias_clause SEMI_COLON!
| register_clause SEMI_COLON!
+ | assert_clause SEMI_COLON!
// semicolons after foreach_complex_statement are optional for backwards compatibility, but to keep
// the grammar unambiguous if there is one then we'll parse it as a single, standalone semicolon
// (which matches the first statement rule)
@@ -382,7 +383,6 @@ op_clause : define_clause
| union_clause
| stream_clause
| mr_clause
- | assert_clause
;
ship_clause : SHIP^ LEFT_PAREN! path_list? RIGHT_PAREN!
@@ -1029,6 +1029,7 @@ eid_without_columns : rel_str_op
| FULL
| REALIAS
| BOOL_COND
+ | ASSERT
;
eid : eid_without_columns
@@ -1075,6 +1076,5 @@ reserved_identifier_whitelist : RANK
| THEN
| ELSE
| END
- | ASSERT
;
Modified: pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Mon Feb 24 21:41:38 2014
@@ -247,11 +247,12 @@ TOKEN_MGR_DECLS : {
{
<"split"> : PIG_START
| <"define"> : PIG_START
-| <"store"> : PIG_START
-| <"import"> : PIG_START
-| <(["a"-"z", "A"-"Z"])+(["a"-"z", "A"-"Z"] | ["0"-"9"] | "_")*(" " | "\t")*"="> : PIG_START
+| <"store"> : PIG_START
+| <"assert"> : PIG_START
+| <"import"> : PIG_START
+| <(["a"-"z", "A"-"Z"])+(["a"-"z", "A"-"Z"] | ["0"-"9"] | "_")*(" " | "\t")*"="> : PIG_START
| <"=>" (" " | "\t")*> : PIG_START
-| < <IDENTIFIER> (" " | "\t")* ("," (" " | "\t")* <IDENTIFIER> )* (" " | "\t")* "="> : PIG_START
+| < <IDENTIFIER> (" " | "\t")* ("," (" " | "\t")* <IDENTIFIER> )* (" " | "\t")* "="> : PIG_START
| < <IDENTIFIER> (" " | "\t")* "(" > : PIG_START
}
Modified: pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Mon Feb 24 21:41:38 2014
@@ -142,15 +142,24 @@ public class PigStatsUtil {
public static void setErrorMessage(String msg) {
- PigStats.get().setErrorMessage(msg);
+ PigStats ps = PigStats.get();
+ if (ps != null) {
+ ps.setErrorMessage(msg);
+ }
}
public static void setErrorCode(int code) {
- PigStats.get().setErrorCode(code);
+ PigStats ps = PigStats.get();
+ if (ps != null) {
+ ps.setErrorCode(code);
+ }
}
public static void setErrorThrowable(Throwable t) {
- PigStats.get().setErrorThrowable(t);
+ PigStats ps = PigStats.get();
+ if (ps != null) {
+ ps.setErrorThrowable(t);
+ }
}
private static Pattern pattern = Pattern.compile("tmp(-)?[\\d]{1,10}$");
Propchange: pig/branches/tez/src/pig-default.properties
------------------------------------------------------------------------------
Merged /pig/trunk/src/pig-default.properties:r1554090-1571421
Modified: pig/branches/tez/test/e2e/pig/tests/cmdline.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/cmdline.conf?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/cmdline.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/cmdline.conf Mon Feb 24 21:41:38 2014
@@ -68,6 +68,7 @@ describe A;\,
# #JIRA[PIG-373]
# {
# 'num' => 4,
+# 'java_params' => ['-Dopt.fetch=false'],
# 'pig' => q\
#A = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray);
#describe A;
@@ -240,6 +241,7 @@ describe D;\,
{
'num' => 1,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
A = load ':INPATH:/singlefile/unicode100' as (name:chararray);
dump A;\,
Modified: pig/branches/tez/test/e2e/pig/tests/negative.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/negative.conf?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/negative.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/negative.conf Mon Feb 24 21:41:38 2014
@@ -34,6 +34,7 @@ $cfg = {
'tests' => [
{
'num' => 1,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
b = group a by name;
@@ -48,6 +49,7 @@ dump c;\,
'tests' => [
{
'num' => 1,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\a = load '/user/gates/nosuchfile'; dump a;\,
'expected_err_regex' => "ERROR 2118: Input path does not exist",
},
@@ -248,6 +250,7 @@ store a into ':INPATH:/singlefile/fileex
{
# missing quotes around command
'num' => 1,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q#
A = load ':INPATH:/singlefile/studenttab10k';
B = foreach A generate $2, $1, $0;
@@ -259,6 +262,7 @@ dump C;#,
{
# input spec missing parenthesis
'num' => 2,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q#
define CMD `perl PigStreaming.pl foo -` input 'foo' using PigStorage() ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -269,6 +273,7 @@ dump B;#,
{
# no serializer name after using
'num' => 3,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q#
define CMD `perl PigStreaming.pl foo -` output ('foo' using );
A = load ':INPATH:/singlefile/studenttab10k';
@@ -279,6 +284,7 @@ dump B;#,
{
# alias name missing from define
'num' => 4,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q#
define `perl PigStreaming.pl foo -`;
A = load ':INPATH:/singlefile/studenttab10k';
@@ -289,6 +295,7 @@ dump B;#,
{
# quotes missing from name of the file in ship script
'num' => 5,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q#
define CMD `perl PigStreaming.pl foo -` ship(:SCRIPTHOMEPATH:/PigStreaming.pl);
A = load ':INPATH:/singlefile/studenttab10k';
@@ -306,6 +313,7 @@ dump B;#,
# Define uses using non-existent command (autoship)
'num' => 1,
'execonly' => 'mapred',
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
define CMD `perl PigStreamingNotThere.pl`;
A = load ':INPATH:/singlefile/studenttab10k';
@@ -316,6 +324,7 @@ dump B;\,
{
# Define uses non-existent command with ship clause
'num' => 2,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
define CMD `perl PigStreamingNotThere.pl foo -` ship(':SCRIPTHOMEPATH:/PigStreamingNotThere.pl');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -339,6 +348,7 @@ dump E;\,
{
# Define uses non-existent serializer
'num' => 4,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
define CMD `perl PigStreaming.pl foo -` input('foo' using SerializerNotThere()) ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -349,6 +359,7 @@ dump B;\,
{
# Define uses non-existent deserializer
'num' => 5,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
define CMD `perl PigStreaming.pl` output(stdout using DeserializerNotThere()) ship(':SCRIPTHOMEPATH:/PigStreaming.pl');
A = load ':INPATH:/singlefile/studenttab10k';
@@ -359,6 +370,7 @@ dump B;\,
{
# Invalid skip path
'num' => 6,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
set stream.skippath 'foo';
define CMD `perl PigStreaming.pl`;
@@ -370,6 +382,7 @@ dump B;\,
{
# Invalid command alias in stream operator
'num' => 7,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
define CMD `perl PigStreaming.pl`;
A = load ':INPATH:/singlefile/studenttab10k';
@@ -380,6 +393,7 @@ dump B;\,
{
# Invalid operator alias in stream operator
'num' => 8,
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
define CMD `perl PigStreaming.pl`;
A = load ':INPATH:/singlefile/studenttab10k';
@@ -483,6 +497,7 @@ store D into ':OUTPATH:';\,
# Define uses using non-existent command
'num' => 1,
'execonly' => 'local',
+ 'java_params' => ['-Dopt.fetch=false'],
'pig' => q\
define CMD `perl PigStreamingNotThere.pl`;
A = load ':INPATH:/singlefile/studenttab10k';
Modified: pig/branches/tez/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/tez/test/e2e/pig/tests/nightly.conf?rev=1571454&r1=1571453&r2=1571454&view=diff
==============================================================================
--- pig/branches/tez/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/tez/test/e2e/pig/tests/nightly.conf Mon Feb 24 21:41:38 2014
@@ -2173,6 +2173,14 @@ describe A;
store A into ':OUTPATH:';\,
},
+ {
+ 'num' => 2,
+ 'pig' => q\
+A = load 'sample' as (line:chararray);
+B = foreach A generate flatten(STRSPLIT(line)) as (i0, i1, i2);
+describe B;\,
+ 'expected_out_regex' => 'B: {i0: bytearray,i1: bytearray,i2: bytearray}',
+ },
],
},
{
@@ -4499,9 +4507,9 @@ store C into ':OUTPATH:';\,
},
{
# Test Union using merge with incompatible types. float->bytearray and chararray->bytearray
- 'num' => 8,
- 'delimiter' => ' ',
- 'pig' => q\
+ 'num' => 8,
+ 'delimiter' => ' ',
+ 'pig' => q\
A = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:int);
B = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:chararray);
C = union onschema A, B;
@@ -4511,17 +4519,18 @@ A = load ':INPATH:/singlefile/studenttab
B = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:bytearray);
C = union A, B;
store C into ':OUTPATH:';\,
- }
- ]
+ }
+ ]
},
- {
+ {
- # Test Union using merge with Simple data types
- 'name' => 'UdfDistributedCache',
+ # Test Union using merge with Simple data types
+ 'name' => 'UdfDistributedCache',
'tests' => [
- {
- 'num' => 1,
+ {
+ 'num' => 1,
+ 'java_params' => ['-Dopt.fetch=false'],
'execonly' => 'mapred', # since distributed cache is not supported in local mode
'pig' => q?
register :FUNCPATH:/testudf.jar;
@@ -4531,8 +4540,8 @@ store C into ':OUTPATH:';\,
c = foreach b generate udfdc(age);
dump c;?,
'expected_out_regex' => ":UdfDistributedCache_1_out:",
- },
- ]
+ },
+ ]
}, {
'name' => 'MonitoredUDF',
'tests' => [
@@ -4793,6 +4802,24 @@ store C into ':OUTPATH:';\,
I = limit H 3;
J = foreach I generate contributions;
STORE J INTO ':OUTPATH:.2';?,
+ }, {
+ # PIG-3641
+ 'num' => 6,
+ 'pig' => q?A = LOAD ':INPATH:/singlefile/votertab10k' AS (name, age, registration, contributions);
+ -- dropping one column to force columnprune
+ B = foreach A generate name, age, registration;
+ -- Next line is the only difference
+ SPLIT B into C1 if age > 50, C2 otherwise;
+ D1 = foreach C1 generate age, registration;
+ STORE D1 INTO ':OUTPATH:.1';
+ STORE C2 INTO ':OUTPATH:.2';?,
+ 'verify_pig_script' => q?A = LOAD ':INPATH:/singlefile/votertab10k' AS (name, age, registration, contributions);
+ -- dropping one column to force columnprune
+ B = foreach A generate name, age, registration;
+ SPLIT B into C1 if age > 50, C2 if age <= 50;
+ D1 = foreach C1 generate age, registration;
+ STORE D1 INTO ':OUTPATH:.1';
+ STORE C2 INTO ':OUTPATH:.2';?,
}
],
},{