You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 08:19:46 UTC

svn commit: r1784237 [2/22] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...

Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java Fri Feb 24 08:19:42 2017
@@ -21,6 +21,7 @@ package org.apache.pig.piggybank.evaluat
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -43,7 +44,7 @@ import org.apache.pig.impl.logicalLayer.
  * 
  * @author Vadim Zaliva <lo...@codemindes.com>
  */
-public class MaxTupleBy1stField extends EvalFunc<Tuple> implements Algebraic
+public class MaxTupleBy1stField extends EvalFunc<Tuple> implements Algebraic, Accumulator<Tuple>
 {
     /**
      * Indicates once for how many items progress hartbeat should be sent.
@@ -131,6 +132,11 @@ public class MaxTupleBy1stField extends
     protected static Tuple max(Tuple input, PigProgressable reporter) throws ExecException
     {
         DataBag values = (DataBag) input.get(0);
+        return max(values,reporter);
+    }
+
+    protected static Tuple max(DataBag values, PigProgressable reporter) throws ExecException
+    {
 
         // if we were handed an empty bag, return NULL
         // this is in compliance with SQL standard
@@ -183,4 +189,44 @@ public class MaxTupleBy1stField extends
         return Final.class.getName();
     }
 
+
+    /**
+     * Accumulator implementation
+     */
+
+    private Tuple intermediate = null;
+
+    /**
+     * Accumulate implementation - calls max() on the incoming tuple set including intermediate tuple if already exists
+     * @param b A tuple containing a single field, which is a bag.  The bag will contain the set
+     * @throws IOException
+     */
+    @Override
+    public void accumulate(Tuple b) throws IOException {
+        try{
+            DataBag values = BagFactory.getInstance().newDefaultBag();
+            values.addAll((DataBag) b.get(0));
+
+            if (intermediate != null) {
+                values.add(intermediate);
+            }
+            intermediate = max(values,reporter);
+
+        }catch (ExecException ee){
+            IOException oughtToBeEE = new IOException();
+            oughtToBeEE.initCause(ee);
+            throw oughtToBeEE;
+        }
+    }
+
+    @Override
+    public Tuple getValue() {
+        return intermediate;
+    }
+
+    @Override
+    public void cleanup() {
+        intermediate = null;
+    }
+
 }

Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java Fri Feb 24 08:19:42 2017
@@ -23,10 +23,13 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.AVG;
+import org.apache.pig.builtin.BigDecimalAvg;
+import org.apache.pig.builtin.BigDecimalMax;
+import org.apache.pig.builtin.BigDecimalMin;
+import org.apache.pig.builtin.BigDecimalSum;
 import org.apache.pig.builtin.COUNT;
 import org.apache.pig.builtin.DoubleAvg;
 import org.apache.pig.builtin.DoubleMax;
@@ -54,6 +57,7 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 
 /**
  * Given an aggregate function, a bag, and possibly a window definition,
@@ -73,23 +77,27 @@ import org.apache.pig.impl.logicalLayer.
  *           <li>sum(int)</li>
  *           <li>sum(long)</li>
  *           <li>sum(bytearray)</li>
+ *           <li>sum(bigdecimal)</li>
  *           <li>avg(double)</li>
  *           <li>avg(float)</li>
  *           <li>avg(long)</li>
  *           <li>avg(int)</li>
  *           <li>avg(bytearray)</li>
+ *           <li>avg(bigdecimal)</li>
  *           <li>min(double)</li>
  *           <li>min(float)</li>
  *           <li>min(long)</li>
  *           <li>min(int)</li>
  *           <li>min(chararray)</li>
  *           <li>min(bytearray)</li>
+ *           <li>min(bigdecimal)</li>
  *           <li>max(double)</li>
  *           <li>max(float)</li>
  *           <li>max(long)</li>
  *           <li>max(int)</li>
  *           <li>max(chararray)</li>
  *           <li>max(bytearray)</li>
+ *           <li>max(bigdecimal)</li>
  *           <li>row_number</li>
  *           <li>first_value</li>
  *           <li>last_value</li>
@@ -153,7 +161,8 @@ import org.apache.pig.impl.logicalLayer.
  * current row and 3 following) over T;</tt>
  *
  * <p>Over accepts a constructor argument specifying the name and type,
- * colon-separated, of its return schema.</p>
+ * colon-separated, of its return schema. If the argument option is 'true' use the inner-search,
+ * take the name and type of bag and return a schema with alias+'_over' and the same type</p>
  *
  * <p><pre>
  * DEFINE IOver org.apache.pig.piggybank.evaluation.Over('state_rk:int');
@@ -188,12 +197,14 @@ public class Over extends EvalFunc<DataB
     private Object[] udfArgs;
     private byte   returnType;
     private String returnName;
+    private boolean searchInnerType;
 
     public Over() {
         initialized = false;
         udfArgs = null;
         func = null;
         returnType = DataType.UNKNOWN;
+        searchInnerType = false;
     }
 
     public Over(String typespec) {
@@ -202,12 +213,16 @@ public class Over extends EvalFunc<DataB
             String[] fn_tn = typespec.split(":", 2);
             this.returnName = fn_tn[0];
             this.returnType = DataType.findTypeByName(fn_tn[1]);
-        } else {
+        } else if(Boolean.parseBoolean(typespec)) {
+            searchInnerType = Boolean.parseBoolean(typespec);
+        }else{
             this.returnName = "result";
             this.returnType = DataType.findTypeByName(typespec);
-        }
+        }       
     }
 
+
+
     @Override
     public DataBag exec(Tuple input) throws IOException {
         if (input == null || input.size() < 2) {
@@ -255,19 +270,42 @@ public class Over extends EvalFunc<DataB
     @Override
     public Schema outputSchema(Schema inputSch) {
         try {
-            if (returnType == DataType.UNKNOWN) {
+            FieldSchema field;
+
+            if (searchInnerType) {
+                field = new FieldSchema(inputSch.getField(0));
+                while (searchInnerType) {
+                    if (field.schema != null
+                            && field.schema.getFields().size() > 1) {
+                        searchInnerType = false;
+                    } else {
+                        if (field.type == DataType.TUPLE
+                                || field.type == DataType.BAG) {
+                            field = new FieldSchema(field.schema.getField(0));
+                        } else {
+                            field.alias = field.alias + "_over";
+                            searchInnerType = false;
+                        }
+                    }
+                }
+
+                searchInnerType = true;
+            } else if (returnType == DataType.UNKNOWN) {
                 return Schema.generateNestedSchema(DataType.BAG, DataType.NULL);
             } else {
-                Schema outputTupleSchema = new Schema(new Schema.FieldSchema(returnName, returnType));
-                return new Schema(new Schema.FieldSchema(
-                        getSchemaName(this.getClass().getName().toLowerCase(), inputSch),
-                            outputTupleSchema, 
-                            DataType.BAG));
+                field = new Schema.FieldSchema(returnName, returnType);
             }
+
+            Schema outputTupleSchema = new Schema(field);
+            return new Schema(new Schema.FieldSchema(getSchemaName(this
+                    .getClass().getName().toLowerCase(), inputSch),
+                    outputTupleSchema, DataType.BAG));
+
         } catch (FrontendException fe) {
             throw new RuntimeException("Unable to create nested schema", fe);
         }
     }
+    
 
     private void init(Tuple input) throws IOException {
         initialized = true;
@@ -329,6 +367,8 @@ public class Over extends EvalFunc<DataB
             func = new LongSum();
         } else if ("sum(bytearray)".equalsIgnoreCase(agg)) {
             func = new SUM();
+        } else if ("sum(bigdecimal)".equalsIgnoreCase(agg)) {
+            func = new BigDecimalSum();
         } else if ("avg(double)".equalsIgnoreCase(agg)) {
             func = new DoubleAvg();
         } else if ("avg(float)".equalsIgnoreCase(agg)) {
@@ -339,6 +379,8 @@ public class Over extends EvalFunc<DataB
             func = new IntAvg();
         } else if ("avg(bytearray)".equalsIgnoreCase(agg)) {
             func = new AVG();
+        } else if ("avg(bigdecimal)".equalsIgnoreCase(agg)) {
+            func = new BigDecimalAvg();
         } else if ("min(double)".equalsIgnoreCase(agg)) {
             func = new DoubleMin();
         } else if ("min(float)".equalsIgnoreCase(agg)) {
@@ -351,6 +393,8 @@ public class Over extends EvalFunc<DataB
             func = new StringMin();
         } else if ("min(bytearray)".equalsIgnoreCase(agg)) {
             func = new MIN();
+        } else if ("min(bigdecimal)".equalsIgnoreCase(agg)) {
+            func = new BigDecimalMin();
         } else if ("max(double)".equalsIgnoreCase(agg)) {
             func = new DoubleMax();
         } else if ("max(float)".equalsIgnoreCase(agg)) {
@@ -363,6 +407,8 @@ public class Over extends EvalFunc<DataB
             func = new StringMax();
         } else if ("max(bytearray)".equalsIgnoreCase(agg)) {
             func = new MAX();
+        } else if ("max(bigdecimal)".equalsIgnoreCase(agg)) {
+            func = new BigDecimalMax();
         } else if ("row_number".equalsIgnoreCase(agg)) {
             func = new RowNumber();
         } else if ("first_value".equalsIgnoreCase(agg)) {

Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelogparser/SearchEngineExtractor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelogparser/SearchEngineExtractor.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelogparser/SearchEngineExtractor.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelogparser/SearchEngineExtractor.java Fri Feb 24 08:19:42 2017
@@ -363,6 +363,15 @@ public class SearchEngineExtractor exten
         searchEngines.put("search.lycos.com", "Lycos");
         searchEngines.put("search.msn.co.uk", "MSN UK");
         searchEngines.put("search.msn.com", "MSN");
+        searchEngines.put("bing.com", "Bing");
+        searchEngines.put("ssl.bing.com", "Bing");
+        searchEngines.put("cn.bing.com", "Bing China");
+        searchEngines.put("br.bing.com", "Bing Brazil");
+        searchEngines.put("it.bing.com", "Bing Italy");
+        searchEngines.put("be.bing.com", "Bing Netherlands");
+        searchEngines.put("uk.bing.com", "Bing UK");
+        searchEngines.put("hk.bing.com", "Bing Hong Kong");
+        searchEngines.put("nz.bing.com", "Bing New Zeland");
         searchEngines.put("search.myway.com", "MyWay");
         searchEngines.put("search.mywebsearch.com", "My Web Search");
         searchEngines.put("search.ntlworld.com", "NTLWorld");

Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/xml/XPath.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/xml/XPath.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/xml/XPath.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/xml/XPath.java Fri Feb 24 08:19:42 2017
@@ -16,8 +16,11 @@ package org.apache.pig.piggybank.evaluat
 import java.io.IOException;
 import java.io.StringReader;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
+import javax.xml.XMLConstants;
+import javax.xml.namespace.NamespaceContext;
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 import javax.xml.xpath.XPathFactory;
@@ -49,8 +52,7 @@ public class XPath extends EvalFunc<Stri
     
     private static boolean cache = true;
     private static boolean ignoreNamespace = true;
-    public static final String EMPTY_STRING = "";
-    
+
     /**
      * input should contain: 1) xml 2) xpath 
      *                       3) optional cache xml doc flag 
@@ -95,8 +97,13 @@ public class XPath extends EvalFunc<Stri
                 return null;
             }
             
-            if(input.size() > 2)
+            if(input.size() > 2) {
                 cache = (Boolean) input.get(2);
+            }
+
+            if (input.size() > 3) {
+                ignoreNamespace = (Boolean) input.get(3);
+            }
 
             if (!cache || xpath == null || !xml.equals(this.xml)) {
                 final InputSource source = new InputSource(new StringReader(xml));
@@ -104,6 +111,7 @@ public class XPath extends EvalFunc<Stri
                 this.xml = xml; // track the xml for subsequent calls to this udf
 
                 final DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+                dbf.setNamespaceAware(!ignoreNamespace);
                 final DocumentBuilder db = dbf.newDocumentBuilder();
 
                 this.document = db.parse(source);
@@ -112,14 +120,32 @@ public class XPath extends EvalFunc<Stri
 
                 this.xpath = xpathFactory.newXPath();
 
+                if (!ignoreNamespace) {
+                    xpath.setNamespaceContext(new NamespaceContext() {
+                        @Override
+                        public String getNamespaceURI(String prefix) {
+                            if (prefix.equals(XMLConstants.DEFAULT_NS_PREFIX)) {
+                                return document.lookupNamespaceURI(null);
+                            } else {
+                                return document.lookupNamespaceURI(prefix);
+                            }
+                        }
+
+                        @Override
+                        public String getPrefix(String namespaceURI) {
+                            return document.lookupPrefix(namespaceURI);
+                        }
+
+                        @Override
+                        public Iterator getPrefixes(String namespaceURI) {
+                            return null;
+                        }
+                    });
+                }
             }
 
             String xpathString = (String) input.get(1);
 
-            if (ignoreNamespace) {
-                xpathString = createNameSpaceIgnoreXpathString(xpathString);
-            }
-
             final String value = xpath.evaluate(xpathString, document);
 
             return value;
@@ -165,34 +191,6 @@ public class XPath extends EvalFunc<Stri
         }
         return true;
     }
-    
-    
-    /**
-     * Returns a new the xPathString by adding additional parameters 
-     * in the existing xPathString for ignoring the namespace during compilation.
-     * 
-     * @param String xpathString
-     * @return String modified xpathString
-     */
-    private String createNameSpaceIgnoreXpathString(final String xpathString) {
-        final String QUERY_PREFIX = "//*";
-        final String LOCAL_PREFIX = "[local-name()='";
-        final String LOCAL_POSTFIX = "']";
-        final String SPLITTER = "/";
-
-        try {
-            String xpathStringWithLocalName = EMPTY_STRING;
-            String[] individualNodes = xpathString.split(SPLITTER);
-
-            for (String node : individualNodes) {
-                xpathStringWithLocalName = xpathStringWithLocalName.concat(QUERY_PREFIX + LOCAL_PREFIX + node
-                        + LOCAL_POSTFIX);
-            }
-            return xpathStringWithLocalName;
-        } catch (Exception ex) {
-            return xpathString;
-        }
-    }
 
     /**
      * Returns argument schemas of the UDF.

Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java Fri Feb 24 08:19:42 2017
@@ -580,7 +580,7 @@ public class CSVExcelStorage extends Pig
                 }
             } else if (b == DOUBLE_QUOTE) {
                 // Does a double quote immediately follow?                  
-                if ((i < recordLen-1) && (buf[i+1] == DOUBLE_QUOTE)) {
+                if ((i < recordLen-1) && (buf[i+1] == DOUBLE_QUOTE) && (fieldBuffer.position() != 0)) {
                     fieldBuffer.put(b);
                     nextTupleSkipChar = true;
                     continue;

Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java Fri Feb 24 08:19:42 2017
@@ -91,6 +91,7 @@ public class DBStorage extends StoreFunc
   /**
    * Write the tuple to Database directly here.
    */
+  @Override
   public void putNext(Tuple tuple) throws IOException {
     int sqlPos = 1;
     try {
@@ -373,4 +374,9 @@ public class DBStorage extends StoreFunc
       p.setProperty(SCHEMA_SIGNATURE, s.toString());
   }
 
+  @Override
+  public Boolean supportsParallelWriteToStoreLocation() {
+    return false;
+  }
+
 }

Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java Fri Feb 24 08:19:42 2017
@@ -60,7 +60,6 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.util.StorageUtil;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.DataByteArray;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 
 /**
  * <code>IndexedStorage</code> is a form of <code>PigStorage</code> that supports a

Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java Fri Feb 24 08:19:42 2017
@@ -16,7 +16,9 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.text.NumberFormat;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
@@ -42,6 +44,9 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.StorageUtil;
+import org.apache.xml.utils.StringBufferPool;
+
+import com.google.common.base.Strings;
 
 /**
  * The UDF is useful for splitting the output data into a bunch of directories
@@ -73,13 +78,21 @@ import org.apache.pig.impl.util.StorageU
  * If the output is compressed,then the sub directories and the output files will
  * be having the extension. Say for example in the above case if bz2 is used one file 
  * will look like ;/my/home/output.bz2/a1.bz2/a1-0000.bz2
+ *
+ * Key field can also be a comma separated list of indices e.g. '0,1' - in this case
+ * storage will be multi-level:
+ * /my/home/output/a1/b1/a1-b1-0000
+ * /my/home/output/a1/b2/a1-b2-0000
+ * There is also an option to leave key values out of storage, see isRemoveKeys.
  */
 public class MultiStorage extends StoreFunc {
 
+  private static final String KEYFIELD_DELIMETER = ",";
   private Path outputPath; // User specified output Path
-  private int splitFieldIndex = -1; // Index of the key field
+  private final List<Integer> splitFieldIndices= new ArrayList<Integer>(); // Indices of the key fields
   private String fieldDel; // delimiter of the output record.
   private Compression comp; // Compression type of output data.
+  private boolean isRemoveKeys = false;
   
   // Compression types supported by this store
   enum Compression {
@@ -95,9 +108,14 @@ public class MultiStorage extends StoreF
     this(parentPathStr, splitFieldIndex, compression, "\\t");
   }
 
+  public MultiStorage(String parentPathStr, String splitFieldIndex,
+      String compression, String fieldDel) {
+    this(parentPathStr, splitFieldIndex, compression, fieldDel, "false");
+  }
+
   /**
    * Constructor
-   * 
+   *
    * @param parentPathStr
    *          Parent output dir path (this will be specified in store statement,
    *            so MultiStorage don't use this parameter in reality. However, we don't
@@ -108,18 +126,26 @@ public class MultiStorage extends StoreF
    *          'bz2', 'bz', 'gz' or 'none'
    * @param fieldDel
    *          Output record field delimiter.
+   * @param isRemoveKeys
+   *          Removes key columns from result during write.
    */
   public MultiStorage(String parentPathStr, String splitFieldIndex,
-      String compression, String fieldDel) {
+                      String compression, String fieldDel, String isRemoveKeys) {
+    this.isRemoveKeys = Boolean.parseBoolean(isRemoveKeys);
     this.outputPath = new Path(parentPathStr);
-    this.splitFieldIndex = Integer.parseInt(splitFieldIndex);
+
+    String[] splitFieldIndices = splitFieldIndex.split(KEYFIELD_DELIMETER);
+    for (String splitFieldIndexString : splitFieldIndices){
+      this.splitFieldIndices.add(Integer.parseInt(splitFieldIndexString));
+    }
+
     this.fieldDel = fieldDel;
     try {
       this.comp = (compression == null) ? Compression.none : Compression
-        .valueOf(compression.toLowerCase());
+              .valueOf(compression.toLowerCase());
     } catch (IllegalArgumentException e) {
       System.err.println("Exception when converting compression string: "
-          + compression + " to enum. No compression will be used");
+              + compression + " to enum. No compression will be used");
       this.comp = Compression.none;
     }
   }
@@ -127,22 +153,26 @@ public class MultiStorage extends StoreF
   //--------------------------------------------------------------------------
   // Implementation of StoreFunc
 
-  private RecordWriter<String, Tuple> writer;
+  private RecordWriter<List<String>, Tuple> writer;
   
   @Override
   public void putNext(Tuple tuple) throws IOException {
-    if (tuple.size() <= splitFieldIndex) {
-      throw new IOException("split field index:" + this.splitFieldIndex
-          + " >= tuple size:" + tuple.size());
+    for (int splitFieldIndex : this.splitFieldIndices) {
+      if (tuple.size() <= splitFieldIndex) {
+        throw new IOException("split field index:" + splitFieldIndex
+                + " >= tuple size:" + tuple.size());
+      }
     }
-    Object field = null;
-    try {
-      field = tuple.get(splitFieldIndex);
-    } catch (ExecException exec) {
-      throw new IOException(exec);
+    List<String> fields = new ArrayList<String>();
+    for (int splitFieldIndex : this.splitFieldIndices){
+      try {
+        fields.add(String.valueOf(tuple.get(splitFieldIndex)));
+      } catch (ExecException exec) {
+        throw new IOException(exec);
+      }
     }
     try {
-      writer.write(String.valueOf(field), tuple);
+      writer.write(fields, tuple);
     } catch (InterruptedException e) {
       throw new IOException(e);
     }
@@ -153,6 +183,9 @@ public class MultiStorage extends StoreF
   public OutputFormat getOutputFormat() throws IOException {
       MultiStorageOutputFormat format = new MultiStorageOutputFormat();
       format.setKeyValueSeparator(fieldDel);
+      if (this.isRemoveKeys){
+        format.setSkipIndices(this.splitFieldIndices);
+      }
       return format;
   }
     
@@ -174,27 +207,33 @@ public class MultiStorage extends StoreF
       FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
     }
   }
- 
+
+  @Override
+  public Boolean supportsParallelWriteToStoreLocation() {
+    return false;
+  }
+
   //--------------------------------------------------------------------------
   // Implementation of OutputFormat
   
   public static class MultiStorageOutputFormat extends
-  TextOutputFormat<String, Tuple> {
+  TextOutputFormat<List<String>, Tuple> {
 
     private String keyValueSeparator = "\\t";
     private byte fieldDel = '\t';
-  
+    private List<Integer> skipIndices = null;
+
     @Override
-    public RecordWriter<String, Tuple> 
+    public RecordWriter<List<String>, Tuple>
     getRecordWriter(TaskAttemptContext context
                 ) throws IOException, InterruptedException {
     
       final TaskAttemptContext ctx = context;
         
-      return new RecordWriter<String, Tuple>() {
+      return new RecordWriter<List<String>, Tuple>() {
 
-        private Map<String, MyLineRecordWriter> storeMap = 
-              new HashMap<String, MyLineRecordWriter>();
+        private Map<List<String>, MyLineRecordWriter> storeMap =
+              new HashMap<List<String>, MyLineRecordWriter>();
           
         private static final int BUFFER_SIZE = 1024;
           
@@ -202,7 +241,7 @@ public class MultiStorage extends StoreF
               new ByteArrayOutputStream(BUFFER_SIZE);
                            
         @Override
-        public void write(String key, Tuple val) throws IOException {                
+        public void write(List<String> key, Tuple val) throws IOException {
           int sz = val.size();
           for (int i = 0; i < sz; i++) {
             Object field;
@@ -212,9 +251,13 @@ public class MultiStorage extends StoreF
               throw ee;
             }
 
-            StorageUtil.putField(mOut, field);
+            boolean skipCurrentField = skipIndices != null && skipIndices.contains(i);
 
-            if (i != sz - 1) {
+            if (!skipCurrentField) {
+              StorageUtil.putField(mOut, field);
+            }
+
+            if (i != sz - 1 && !skipCurrentField) {
               mOut.write(fieldDel);
             }
           }
@@ -231,17 +274,17 @@ public class MultiStorage extends StoreF
           }
         }
       
-        private MyLineRecordWriter getStore(String fieldValue) throws IOException {
-          MyLineRecordWriter store = storeMap.get(fieldValue);
+        private MyLineRecordWriter getStore(List<String> fieldValues) throws IOException {
+          MyLineRecordWriter store = storeMap.get(fieldValues);
           if (store == null) {                  
-            DataOutputStream os = createOutputStream(fieldValue);
+            DataOutputStream os = createOutputStream(fieldValues);
             store = new MyLineRecordWriter(os, keyValueSeparator);
-            storeMap.put(fieldValue, store);
+            storeMap.put(fieldValues, store);
           }
           return store;
         }
           
-        private DataOutputStream createOutputStream(String fieldValue) throws IOException {
+        private DataOutputStream createOutputStream(List<String> fieldValues) throws IOException {
           Configuration conf = ctx.getConfiguration();
           TaskID taskId = ctx.getTaskAttemptID().getTaskID();
           
@@ -259,7 +302,21 @@ public class MultiStorage extends StoreF
           NumberFormat nf = NumberFormat.getInstance();
           nf.setMinimumIntegerDigits(4);
 
-          Path path = new Path(fieldValue+extension, fieldValue + '-'
+          StringBuffer pathStringBuffer = new StringBuffer();
+          for (String fieldValue : fieldValues){
+            String safeFieldValue = fieldValue.replaceAll("\\/","-");
+            pathStringBuffer.append(safeFieldValue);
+            pathStringBuffer.append("/");
+          }
+          pathStringBuffer.deleteCharAt(pathStringBuffer.length()-1);
+          String pathString = pathStringBuffer.toString();
+          String idString = pathString.replaceAll("\\/","-");
+
+          if (!Strings.isNullOrEmpty(extension)){
+            pathString = pathString.replaceAll("\\/",extension+"\\/");
+          }
+
+          Path path = new Path(pathString+extension, idString + '-'
                 + nf.format(taskId.getId())+extension);
           Path workOutputPath = ((FileOutputCommitter)getOutputCommitter(ctx)).getWorkPath();
           Path file = new Path(workOutputPath, path);
@@ -279,8 +336,12 @@ public class MultiStorage extends StoreF
       keyValueSeparator = sep;
       fieldDel = StorageUtil.parseFieldDel(keyValueSeparator);  
     }
-  
-  //------------------------------------------------------------------------
+
+    public void setSkipIndices(List<Integer> skipIndices) {
+      this.skipIndices = skipIndices;
+    }
+
+    //------------------------------------------------------------------------
   //
   
     protected static class MyLineRecordWriter

Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java Fri Feb 24 08:19:42 2017
@@ -18,12 +18,11 @@
 package org.apache.pig.piggybank.evaluation;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.math.BigDecimal;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Random;
 
 import org.apache.pig.backend.executionengine.ExecException;
@@ -34,8 +33,6 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-import org.junit.Before;
 import org.junit.Test;
 
 public class TestOver {
@@ -66,11 +63,25 @@ public class TestOver {
         out = func.outputSchema(in);
         assertEquals("{org.apache.pig.piggybank.evaluation.over_3: {result: double}}", out.toString());
 
+        // bigdecimal
+        func = new Over("BIGDECIMAL");
+        in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
+        out = func.outputSchema(in);
+        assertEquals("{org.apache.pig.piggybank.evaluation.over_4: {result: bigdecimal}}", out.toString());
+        
         // named 
         func = new Over("bob:chararray");
         in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
         out = func.outputSchema(in);
-        assertEquals("{org.apache.pig.piggybank.evaluation.over_4: {bob: chararray}}", out.toString());
+        assertEquals("{org.apache.pig.piggybank.evaluation.over_5: {bob: chararray}}", out.toString());
+        
+        
+        // Search inner alias and type
+        func = new Over("true");
+        in = Schema.generateNestedSchema(DataType.BAG, DataType.BIGDECIMAL);
+        in.getField(0).schema.getField(0).alias="test";
+        out = func.outputSchema(in);
+        assertEquals("{org.apache.pig.piggybank.evaluation.over_6: {test_over: bigdecimal}}", out.toString());
     }
 
     @Test
@@ -397,6 +408,28 @@ public class TestOver {
             assertEquals(new Long(10), to.get(0));
         }
     }
+    
+    @Test
+    public void testSumBigDecimal() throws Exception {
+    	Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, new BigDecimal(1));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "sum(bigdecimal)");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new BigDecimal(10), to.get(0));
+        }
+    }
 
     @Test
     public void testAvgDouble() throws Exception {
@@ -509,6 +542,29 @@ public class TestOver {
     }
     
     @Test
+    public void testAvgBigDecimal() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, new BigDecimal(i));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "avg(bigdecimal)");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new BigDecimal(4.5), to.get(0));
+        }
+    }
+    
+    
+    @Test
     public void testMinDouble() throws Exception {
         Over func = new Over();
         DataBag inbag = BagFactory.getInstance().newDefaultBag();
@@ -627,6 +683,26 @@ public class TestOver {
             assertEquals("0", to.get(0));
         }
     }
+    
+    @Test
+    public void testMinBigDecimal() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0,  new BigDecimal(i));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "min(bigdecimal)");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new BigDecimal(0), to.get(0));
+        }
+    }
 
     @Test
     public void testMaxDouble() throws Exception {
@@ -754,6 +830,28 @@ public class TestOver {
             assertEquals("9", to.get(0));
         }
     }
+    
+    @Test
+    public void testMaxBigDecimal() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, new BigDecimal(i));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "max(bigdecimal)");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 0;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new BigDecimal(count++), to.get(0));
+        }
+    }
+	
 
     @Test
     public void testRowNumber() throws Exception {

Added: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestMaxTupleBy1stField.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestMaxTupleBy1stField.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestMaxTupleBy1stField.java (added)
+++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestMaxTupleBy1stField.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,95 @@
+package org.apache.pig.piggybank.test.evaluation;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.common.collect.Lists;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.piggybank.evaluation.MaxTupleBy1stField;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestMaxTupleBy1stField {
+
+    private static List<Tuple> inputTuples = new ArrayList<>();
+    private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+    private static final BagFactory BAG_FACTORY = BagFactory.getInstance();
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(0L, "Fruit", "orange", 21F)));
+        inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(1L, "Fruit", "apple", 9.9F)));
+        inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(2L, "Vegetable", "paprika", 30F)));
+        inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(3L, "Fruit", "blueberry", 40F)));
+        inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(4L, "Vegetable", "carrot", 50F)));
+        inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(5L, "Fruit", "blueberry", 41F)));
+        inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(6L, "Vegetable", "paprika", 31F)));
+        inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(7L, "Fruit", "orange", 20.5F)));
+        inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(8L, "Fruit", "apple", 10.1F)));
+        inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(9L, "Fruit", "apple", 10.2F)));
+    }
+
+
+    @Test
+    public void testExecFunc() throws Exception {
+        MaxTupleBy1stField udf = new MaxTupleBy1stField();
+        Tuple inputTuple = createTupleFromInputList(0,inputTuples.size());
+
+        Tuple result = udf.exec(inputTuple);
+        Assert.assertEquals("apple", result.get(2));
+        Assert.assertEquals(10.2F, (Float) result.get(3), 1E-8);
+    }
+
+    @Test
+    public void testAccumulator() throws Exception {
+        MaxTupleBy1stField udf = new MaxTupleBy1stField();
+
+        Tuple inputTuple = createTupleFromInputList(0, 3);
+        udf.accumulate(inputTuple);
+        Tuple result = udf.getValue();
+        Assert.assertEquals("paprika", result.get(2));
+        Assert.assertEquals(30F, (Float) result.get(3), 1E-6);
+
+        inputTuple = createTupleFromInputList(3, 6);
+        udf.accumulate(inputTuple);
+        result = udf.getValue();
+        Assert.assertEquals("apple", result.get(2));
+        Assert.assertEquals(10.1F, (Float) result.get(3), 1E-6);
+
+        udf.cleanup();
+        Assert.assertEquals(null,udf.getValue());
+    }
+
+    private static Tuple createTupleFromInputList(int offset, int length) {
+        DataBag inputBag = BAG_FACTORY.newDefaultBag();
+        for (int i = offset; i < offset+length; ++i) {
+            inputBag.add(inputTuples.get(i));
+        }
+        Tuple inputTuple = TUPLE_FACTORY.newTuple();
+        inputTuple.append(inputBag);
+        return inputTuple;
+    }
+
+}

Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/XPathTest.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/XPathTest.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/XPathTest.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/XPathTest.java Fri Feb 24 08:19:42 2017
@@ -151,6 +151,27 @@ public class XPathTest {
     }
 
     @Test
+    public void testExecTupleWithDontIgnoreNamespace() throws Exception {
+
+        final XPath xpath = new XPath();
+
+        final Tuple tuple = mock(Tuple.class);
+
+        when(tuple.get(0)).thenReturn("<?xml version='1.0'?>\n" +
+                "<foo:document xmlns:foo=\"http://apache.org/foo\" xmlns:bar=\"http://apache.org/bar\">" +
+                "<bar:element>MyBar</bar:element>" +
+                "</foo:document>");
+
+        when(tuple.size()).thenReturn(4);
+        when(tuple.get(2)).thenReturn(true);
+        when(tuple.get(3)).thenReturn(false);
+
+        when(tuple.get(1)).thenReturn("/foo:document/bar:element");
+        assertEquals("MyBar", xpath.exec(tuple));
+
+    }
+
+    @Test
     public void testExecTupleWithElementNodeWithComplexNameSpace() throws Exception {
 
         final XPath xpath = new XPath();
@@ -210,7 +231,31 @@ public class XPathTest {
         assertEquals("4 stars3.5 stars4 stars4.2 stars3.5 stars", xpath.exec(tuple));
 
     }
-    
+
+    @Test
+    public void testFunctionInXPath() throws Exception {
+
+        final XPath xpath = new XPath();
+
+        final Tuple tuple = mock(Tuple.class);
+
+        when(tuple.get(0)).thenReturn("<Aa name=\"test1\">" +
+                "<Bb Cc=\"1\"/>" +
+                "<Bb Cc=\"1\"/>" +
+                "<Bb Cc=\"1\"/>" +
+                "<Bb Cc=\"1\"/>" +
+                "<Dd>test2</Dd>" +
+                "</Aa>");
+
+        when(tuple.size()).thenReturn(4);
+        when(tuple.get(1)).thenReturn("sum(Aa/Bb/@Cc)");
+        when(tuple.get(2)).thenReturn(true);
+        when(tuple.get(3)).thenReturn(true);
+
+        assertEquals("4", xpath.exec(tuple));
+
+    }
+
     @Ignore //--optional test
     @Test 
     public void testCacheBenefit() throws Exception{

Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java Fri Feb 24 08:19:42 2017
@@ -218,7 +218,7 @@ public class TestCSVExcelStorage  {
         Util.registerMultiLineQuery(pig, script);
         Iterator<Tuple> it = pig.openIterator("a");
         Assert.assertEquals(Util.createTuple(new String[] {"foo,\"bar\",baz"}), it.next());
-        Assert.assertEquals(Util.createTuple(new String[] {"\"\"\"\""}), it.next());
+        Assert.assertEquals(Util.createTuple(new String[] {"\"\"\""}), it.next());
     }
 
     // Handle newlines in fields

Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLogFormatLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLogFormatLoader.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLogFormatLoader.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLogFormatLoader.java Fri Feb 24 08:19:42 2017
@@ -109,7 +109,7 @@ public class TestLogFormatLoader {
         Tuple actual = out.get(0);
         Tuple expected = tuple(
             "2001:980:91c0:1:8d31:a232:25e5:85d",
-            "[05/Sep/2010:11:27:50 +0200]",
+            "05/Sep/2010:11:27:50 +0200",
             "koken-pannen_303_hs-koken-pannen-afj-120601_B3_product_1_9200000002876066",
             map(
                 "promo"       , "koken-pannen_303_hs-koken-pannen-afj-120601_B3_product_1_9200000002876066",

Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java Fri Feb 24 08:19:42 2017
@@ -18,34 +18,41 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.test.MiniCluster;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.test.MiniGenericCluster;
 import org.apache.pig.test.Util;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 
-import junit.framework.Assert;
-import junit.framework.TestCase;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-public class TestMultiStorage extends TestCase {
+public class TestMultiStorage {
   private static final String INPUT_FILE = "MultiStorageInput.txt";
 
   private PigServer pigServer;
   private PigServer pigServerLocal;
 
-  private MiniCluster cluster = MiniCluster.buildCluster();
+  private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
-  public TestMultiStorage() throws ExecException, IOException {
-    pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
-    pigServerLocal = new PigServer(ExecType.LOCAL);
+  public TestMultiStorage() throws Exception {
+    pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+    pigServerLocal = new PigServer(Util.getLocalTestMode());
   }
 
   public static final PathFilter hiddenPathFilter = new PathFilter() {
@@ -74,59 +81,83 @@ public class TestMultiStorage extends Te
     Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
   }
 
-  @Override
   @Before
   public void setUp() throws Exception {
     createFile();
     FileSystem fs = FileSystem.getLocal(new Configuration());
     Path localOut = new Path("local-out");
-    Path dummy = new Path("dummy");
     if (fs.exists(localOut)) {
       fs.delete(localOut, true);
     }
-    if (fs.exists(dummy)) {
-      fs.delete(dummy, true);
-    }
   }
 
-  @Override
   @After
   public void tearDown() throws Exception {
     new File(INPUT_FILE).delete();
     Util.deleteFile(cluster, INPUT_FILE);
+  }
+
+  @AfterClass
+  public static void shutdown() {
     cluster.shutDown();
   }
 
   enum Mode {
     local, cluster
-  };
+  }
 
   @Test
   public void testMultiStorage() throws IOException {
     final String LOAD = "A = LOAD '" + INPUT_FILE + "' as (id, name, n);";
     final String MULTI_STORE_CLUSTER = "STORE A INTO 'mr-out' USING "
         + "org.apache.pig.piggybank.storage.MultiStorage('mr-out', '1');";
-    final String MULTI_STORE_LOCAL = "STORE A INTO 'dummy' USING "
+    final String MULTI_STORE_LOCAL = "STORE A INTO 'local-out' USING "
         + "org.apache.pig.piggybank.storage.MultiStorage('local-out', '1');";
 
     System.out.print("Testing in LOCAL mode: ...");
-    //testMultiStorage(Mode.local, "local-out", LOAD, MULTI_STORE_LOCAL);
+    testMultiStorage(Mode.local, "local-out", LOAD, MULTI_STORE_LOCAL);
     System.out.println("Succeeded!");
-    
+
     System.out.print("Testing in CLUSTER mode: ...");
     testMultiStorage( Mode.cluster, "mr-out", LOAD, MULTI_STORE_CLUSTER);
     System.out.println("Succeeded!");
-    
-    
   }
 
-  /**
-   * The actual method that run the test in local or cluster mode. 
-   * 
-   * @param pigServer
-   * @param mode
-   * @param queries
-   * @throws IOException
+  @Test
+  public void testOutputStats() throws IOException {
+    FileSystem fs = cluster.getFileSystem();
+
+    pigServer.setBatchOn();
+    pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, n);");
+    pigServer.registerQuery("B = FILTER A BY name == 'apple';");
+    pigServer.registerQuery("STORE A INTO 'out1' USING org.apache.pig.piggybank.storage.MultiStorage('out1', '1');"); //153 bytes
+    pigServer.registerQuery("STORE B INTO 'out2' USING org.apache.pig.piggybank.storage.MultiStorage('out2', '1');"); // 45 bytes
+
+    ExecJob job = pigServer.executeBatch().get(0);
+
+    PigStats stats = job.getStatistics();
+    PigStats.JobGraph jobGraph = stats.getJobGraph();
+    JobStats jobStats = (JobStats) jobGraph.getSinks().get(0);
+    Map<String, Long> multiStoreCounters = jobStats.getMultiStoreCounters();
+    List<OutputStats> outputStats = SimplePigStats.get().getOutputStats();
+    OutputStats outputStats1 = "out1".equals(outputStats.get(0).getName()) ? outputStats.get(0) : outputStats.get(1);
+    OutputStats outputStats2 = "out2".equals(outputStats.get(0).getName()) ? outputStats.get(0) : outputStats.get(1);
+
+    assertEquals(153 + 45, stats.getBytesWritten());
+    assertEquals(2, outputStats.size()); // 2 split conditions
+    assertEquals(153, outputStats1.getBytes());
+    assertEquals(45, outputStats2.getBytes());
+    assertEquals(9, outputStats1.getRecords());
+    assertEquals(3, outputStats2.getRecords());
+    assertEquals(3L, multiStoreCounters.get("Output records in _1_out2").longValue());
+    assertEquals(9L, multiStoreCounters.get("Output records in _0_out1").longValue());
+
+    fs.delete(new Path("out1"), true);
+    fs.delete(new Path("out2"), true);
+  }
+
+    /**
+   * The actual method that run the test in local or cluster mode.
    */
   private void testMultiStorage( Mode mode, String outPath,
       String... queries) throws IOException {
@@ -142,42 +173,38 @@ public class TestMultiStorage extends Te
   /**
    * Test if records are split into directories corresponding to split field
    * values
-   * 
-   * @param mode
-   * @throws IOException
    */
   private void verifyResults(Mode mode, String outPath) throws IOException {
     FileSystem fs = (Mode.local == mode ? FileSystem
         .getLocal(new Configuration()) : cluster.getFileSystem());
     Path output = new Path(outPath);
-    Assert.assertTrue("Output dir does not exists!", fs.exists(output)
+    assertTrue("Output dir does not exists!", fs.exists(output)
         && fs.getFileStatus(output).isDir());
 
     Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
-    Assert.assertTrue("Split field dirs not found!", paths != null);
+    assertTrue("Split field dirs not found!", paths != null);
 
     for (Path path : paths) {
       String splitField = path.getName();
       Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
-      Assert.assertTrue("No files found for path: " + path.toUri().getPath(),
+      assertTrue("No files found for path: " + path.toUri().getPath(),
           files != null);
       for (Path filePath : files) {
-        Assert.assertTrue("This shouldn't be a directory", fs.isFile(filePath));
-        
+        assertTrue("This shouldn't be a directory", fs.isFile(filePath));
         BufferedReader reader = new BufferedReader(new InputStreamReader(fs
                 .open(filePath)));
         String line = "";
         int count = 0;
         while ((line = reader.readLine()) != null) {
           String[] fields = line.split("\\t");
-          Assert.assertEquals(fields.length, 3);
-          Assert.assertEquals("Unexpected field value in the output record",
+          assertEquals(fields.length, 3);
+          assertEquals("Unexpected field value in the output record",
                 splitField, fields[1]);
           count++;
           System.out.println("field: " + fields[1]);
-        }        
+        }
         reader.close();
-        Assert.assertEquals(count, 3);
+        assertEquals(count, 3);
       }
     }
   }

Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java Fri Feb 24 08:19:42 2017
@@ -21,11 +21,14 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.BZip2Codec;
@@ -37,6 +40,10 @@ import org.apache.pig.backend.executione
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.test.Util;
 
+import com.google.common.collect.Sets;
+
+import org.junit.Assert;
+
 public class TestMultiStorageCompression extends TestCase {
 
    private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)";
@@ -59,8 +66,8 @@ public class TestMultiStorageCompression
       filesToDelete.add(outputPath);
 
       try {
-         runQuery(outputPath, type);
-         verifyResults(type, filesToDelete, outputPath);
+         runQuery(outputPath, "0", type);
+         verifyResults(type, outputPath);
       } finally {
          cleanUpDirs(filesToDelete);
       }
@@ -77,22 +84,22 @@ public class TestMultiStorageCompression
       filesToDelete.add(outputPath);
 
       try {
-         runQuery(outputPath, type);
-         verifyResults(type, filesToDelete, outputPath);
+         runQuery(outputPath, "0", type);
+         verifyResults(type, outputPath);
       } finally {
          cleanUpDirs(filesToDelete);
       }
    }
 
-   private void cleanUpDirs(List<String> filesToDelete) {
+   private void cleanUpDirs(List<String> filesToDelete) throws IOException {
       // Delete files recursively
       Collections.reverse(filesToDelete);
       for (String string : filesToDelete)
-         new File(string).delete();
+         FileUtils.deleteDirectory(new File(string));
    }
 
 
-   private void verifyResults(String type, List<String> filesToDelete,
+   private void verifyResults(String type,
          String outputPath) throws IOException, FileNotFoundException {
       // Verify the output
       File outputDir = new File(outputPath);
@@ -114,12 +121,10 @@ public class TestMultiStorageCompression
              continue;
          String topFolder = outputPath + File.separator + indexFolder;
          File indexFolderFile = new File(topFolder);
-         filesToDelete.add(topFolder);
          String[] list = indexFolderFile.list();
          for (String outputFile : list) {
 
             String file = topFolder + File.separator + outputFile;
-            filesToDelete.add(file);
 
             // Skip off any file starting with .
             if (outputFile.startsWith("."))
@@ -159,7 +164,7 @@ public class TestMultiStorageCompression
       }
    }
 
-   private void runQuery(String outputPath, String compressionType)
+   private void runQuery(String outputPath, String keyColIndices, String compressionType)
          throws Exception, ExecException, IOException, FrontendException {
 
       // create a data file
@@ -172,7 +177,7 @@ public class TestMultiStorageCompression
 
       String query2 = "STORE A INTO '" + Util.encodeEscape(outputPath)
             + "' USING org.apache.pig.piggybank.storage.MultiStorage" + "('"
-            + Util.encodeEscape(outputPath) + "','0', '" + compressionType + "', '\\t');";
+            + Util.encodeEscape(outputPath) + "','"+keyColIndices+"', '" + compressionType + "', '\\t');";
 
       // Run Pig
       pig.setBatchOn();
@@ -182,5 +187,32 @@ public class TestMultiStorageCompression
       pig.executeBatch();
    }
 
+   public void testMultiStorageShouldSupportMultiLevelAndGz() throws Exception {
+      String type = "gz";
+      String outputDir = "output001.multi." + type;
+      List<String> filesToDelete = new ArrayList<String>();
+
+      String tmpDir = System.getProperty("java.io.tmpdir");
+      String outputPath = tmpDir + File.separator + outputDir;
+
+      filesToDelete.add(outputPath);
+      try {
+         runQuery(outputPath, "1,0", type);
+         Collection<File> fileList = FileUtils.listFiles(new File(outputPath),null,true);
+         Set<String> expectedPaths = Sets.newHashSet( "output001.multi.gz/a.gz/f1.gz/a-f1-0,000.gz",
+                                                      "output001.multi.gz/b.gz/f2.gz/b-f2-0,000.gz",
+                                                      "output001.multi.gz/c.gz/f3.gz/c-f3-0,000.gz",
+                                                      "output001.multi.gz/d.gz/f4.gz/d-f4-0,000.gz");
+         for (File file : fileList){
+            String foundPath = file.getAbsolutePath().substring(file.getAbsolutePath().indexOf(outputDir));
+            if (expectedPaths.contains(foundPath)){
+               expectedPaths.remove(foundPath);
+            }
+         }
+         Assert.assertTrue(expectedPaths.isEmpty());
+      } finally {
+         cleanUpDirs(filesToDelete);
+      }
+   }
 
 }

Added: pig/branches/spark/dev-support/docker/Dockerfile
URL: http://svn.apache.org/viewvc/pig/branches/spark/dev-support/docker/Dockerfile?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/dev-support/docker/Dockerfile (added)
+++ pig/branches/spark/dev-support/docker/Dockerfile Fri Feb 24 08:19:42 2017
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Dockerfile for installing the necessary dependencies for building Apache Pig.
+# See BUILDING.md.
+
+FROM ubuntu:trusty
+
+# Define working directory.
+WORKDIR /root
+
+RUN apt-get update
+
+# Install dependencies from packages
+RUN sed -i 's/# \(.*multiverse$\)/\1/g' /etc/apt/sources.list && \
+    apt-get install -y build-essential && \
+    apt-get install -y software-properties-common && \
+    apt-get install --no-install-recommends -y \
+            git subversion \
+            byobu htop man unzip vim \
+            cabal-install \
+            curl wget \
+            openjdk-7-jdk \
+            ant ant-contrib ant-optional make maven \
+            cmake gcc g++ protobuf-compiler \
+            build-essential libtool \
+            zlib1g-dev pkg-config libssl-dev \
+            snappy libsnappy-dev \
+            bzip2 libbz2-dev \
+            libjansson-dev \
+            fuse libfuse-dev \
+            libcurl4-openssl-dev \
+            python python2.7 && \
+    rm -rf /var/lib/apt/lists/*
+
+# Define commonly used JAVA_HOME variable
+ENV JAVA_HOME /usr/lib/jvm/java-7-openjdk-amd64
+
+# Fixing the Apache commons / Maven dependency problem under Ubuntu:
+# See http://wiki.apache.org/commons/VfsProblems
+RUN cd /usr/share/maven/lib && ln -s ../../java/commons-lang.jar .
+
+# Avoid out of memory errors in builds
+ENV MAVEN_OPTS -Xms256m -Xmx512m
+
+# Install findbugs
+RUN mkdir -p /opt/findbugs && \
+    wget http://sourceforge.net/projects/findbugs/files/findbugs/3.0.1/findbugs-noUpdateChecks-3.0.1.tar.gz/download \
+         -O /opt/findbugs.tar.gz && \
+    tar xzf /opt/findbugs.tar.gz --strip-components 1 -C /opt/findbugs
+ENV FINDBUGS_HOME /opt/findbugs
+
+# Install Forrest in /usr/local/apache-forrest
+# Screenscrape the download page for a local mirror URL
+RUN cd /usr/local/ && \
+    curl https://forrest.apache.org/mirrors.cgi | \
+    fgrep href | fgrep apache-forrest-0.9 | \
+    sed 's@^.*"\(http[^"]*apache-forrest-[^"]*.tar.gz\)".*@\1@' | \
+    xargs -n1 -r wget
+
+# Unpack Apache Forrest
+RUN cd /usr/local/ && \
+    tar xzf apache-forrest-0.9-sources.tar.gz && \
+    tar xzf apache-forrest-0.9-dependencies.tar.gz && \
+    mv apache-forrest-0.9 apache-forrest
+RUN cd /usr/local/apache-forrest/main && ./build.sh
+
+# The solution for https://issues.apache.org/jira/browse/PIG-3906
+RUN mkdir -p /usr/local/apache-forrest/plugins       && chmod a+rwX -R /usr/local/apache-forrest/plugins
+RUN mkdir -p /usr/local/apache-forrest/build/plugins && chmod a+rwX -R /usr/local/apache-forrest/build/plugins
+
+# Configure where forrest can be found
+RUN echo 'forrest.home=/usr/local/apache-forrest' > build.properties
+ENV FORREST_HOME /usr/local/apache-forrest
+
+# Add a welcome message and environment checks.
+ADD build_env_checks.sh /root/build_env_checks.sh
+RUN chmod 755 /root/build_env_checks.sh
+ADD configure-for-user.sh /root/configure-for-user.sh
+RUN chmod 755 /root/configure-for-user.sh
+RUN echo '~/build_env_checks.sh' >> /root/.bashrc

Added: pig/branches/spark/dev-support/docker/build_env_checks.sh
URL: http://svn.apache.org/viewvc/pig/branches/spark/dev-support/docker/build_env_checks.sh?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/dev-support/docker/build_env_checks.sh (added)
+++ pig/branches/spark/dev-support/docker/build_env_checks.sh Fri Feb 24 08:19:42 2017
@@ -0,0 +1,120 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# -------------------------------------------------------
+function showWelcome {
+
+# http://patorjk.com/software/taag/#p=display&f=Doom&t=Pig%20Builder
+cat <<Welcome-message
+
+______ _        ______       _ _     _
+| ___ (_)       | ___ \\     (_) |   | |
+| |_/ /_  __ _  | |_/ /_   _ _| | __| | ___ _ __
+|  __/| |/ _\` | | ___ \\ | | | | |/ _\` |/ _ \\ '__|
+| |   | | (_| | | |_/ / |_| | | | (_| |  __/ |_
+\\_|   |_|\\__, | \\____/ \\__,_|_|_|\\__,_|\\___|_|
+          __/ |
+         |___/
+
+This is the standard Apache Pig Developer build environment.
+This has all the right tools installed required to build
+Pig from source.
+
+Welcome-message
+}
+
+# -------------------------------------------------------
+
+function showAbort {
+  cat <<Abort-message
+
+  ___  _                _   _
+ / _ \\| |              | | (_)
+/ /_\\ \\ |__   ___  _ __| |_ _ _ __   __ _
+|  _  | '_ \\ / _ \\| '__| __| | '_ \\ / _\` |
+| | | | |_) | (_) | |  | |_| | | | | (_| |
+\\_| |_/_.__/ \\___/|_|   \\__|_|_| |_|\\__, |
+                                     __/ |
+                                    |___/
+
+Abort-message
+}
+
+# -------------------------------------------------------
+
+function failIfUserIsRoot {
+    if [ "$(id -u)" -eq "0" ]; # If you are root then something went wrong.
+    then
+        cat <<End-of-message
+
+Apparently you are inside this docker container as the user root.
+Putting it simply:
+
+   This should not occur.
+
+Known possible causes of this are:
+1) Running this script as the root user ( Just don't )
+2) Running an old docker version ( upgrade to 1.4.1 or higher )
+
+End-of-message
+
+    showAbort
+
+    logout
+
+    fi
+}
+
+# -------------------------------------------------------
+
+# Configurable low water mark in GiB
+MINIMAL_MEMORY_GiB=2
+
+function warnIfLowMemory {
+    MINIMAL_MEMORY=$((MINIMAL_MEMORY_GiB*1024*1024)) # Convert to KiB
+    INSTALLED_MEMORY=$(fgrep MemTotal /proc/meminfo | awk '{print $2}')
+    if [ $((INSTALLED_MEMORY)) -le $((MINIMAL_MEMORY)) ];
+    then
+        cat <<End-of-message
+
+ _                    ___  ___
+| |                   |  \\/  |
+| |     _____      __ | .  . | ___ _ __ ___   ___  _ __ _   _
+| |    / _ \\ \\ /\\ / / | |\\/| |/ _ \\ '_ \` _ \\ / _ \\| '__| | | |
+| |___| (_) \\ V  V /  | |  | |  __/ | | | | | (_) | |  | |_| |
+\\_____/\\___/ \\_/\\_/   \\_|  |_/\\___|_| |_| |_|\\___/|_|   \\__, |
+                                                         __/ |
+                                                        |___/
+
+Your system is running on very little memory.
+This means it may work but it wil most likely be slower than needed.
+
+If you are running this via boot2docker you can simply increase
+the available memory to atleast ${MINIMAL_MEMORY_GiB} GiB (you have $((INSTALLED_MEMORY/(1024*1024))) GiB )
+
+End-of-message
+    fi
+}
+
+# -------------------------------------------------------
+
+showWelcome
+warnIfLowMemory
+failIfUserIsRoot
+
+# -------------------------------------------------------

Added: pig/branches/spark/dev-support/docker/configure-for-user.sh
URL: http://svn.apache.org/viewvc/pig/branches/spark/dev-support/docker/configure-for-user.sh?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/dev-support/docker/configure-for-user.sh (added)
+++ pig/branches/spark/dev-support/docker/configure-for-user.sh Fri Feb 24 08:19:42 2017
@@ -0,0 +1,40 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This script is used to tweak the environment at the moment we know 
+# the real username of the person using this.
+# By making this script a part of the image we can extend and update 
+# it to fit future needs more easily.
+
+# Native Linux (direct or via sudo)
+USER_NAME=$1
+USER_ID=$2
+GROUP_ID=$3
+
+groupadd --non-unique -g ${GROUP_ID} ${USER_NAME}
+useradd -g ${GROUP_ID} -u ${USER_ID} -k /root -m ${USER_NAME}
+echo "export HOME=/home/${USER_NAME}" >> ~/.bashrc
+echo "export USER=${USER_NAME}" >> ~/.bashrc
+
+VBOXSF_GROUP_LINE=$4
+if [ -n ${VBOXSF_GROUP_LINE} ];
+then
+    echo ${VBOXSF_GROUP_LINE} >> /etc/group
+    usermod -aG vboxsf ${USER_NAME}
+fi
+
+echo "${USER_NAME}    ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers.d/${USER_NAME}