You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/24 08:19:46 UTC
svn commit: r1784237 [2/22] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelo...
Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java Fri Feb 24 08:19:42 2017
@@ -21,6 +21,7 @@ package org.apache.pig.piggybank.evaluat
import java.io.IOException;
import java.util.Iterator;
+import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
@@ -43,7 +44,7 @@ import org.apache.pig.impl.logicalLayer.
*
* @author Vadim Zaliva <lo...@codemindes.com>
*/
-public class MaxTupleBy1stField extends EvalFunc<Tuple> implements Algebraic
+public class MaxTupleBy1stField extends EvalFunc<Tuple> implements Algebraic, Accumulator<Tuple>
{
/**
* Indicates once for how many items progress hartbeat should be sent.
@@ -131,6 +132,11 @@ public class MaxTupleBy1stField extends
protected static Tuple max(Tuple input, PigProgressable reporter) throws ExecException
{
DataBag values = (DataBag) input.get(0);
+ return max(values,reporter);
+ }
+
+ protected static Tuple max(DataBag values, PigProgressable reporter) throws ExecException
+ {
// if we were handed an empty bag, return NULL
// this is in compliance with SQL standard
@@ -183,4 +189,44 @@ public class MaxTupleBy1stField extends
return Final.class.getName();
}
+
+ /**
+ * Accumulator implementation
+ */
+
+ private Tuple intermediate = null;
+
+ /**
+ * Accumulate implementation - calls max() on the incoming tuple set including intermediate tuple if already exists
+ * @param b A tuple containing a single field, which is a bag. The bag will contain the set
+ * @throws IOException
+ */
+ @Override
+ public void accumulate(Tuple b) throws IOException {
+ try{
+ DataBag values = BagFactory.getInstance().newDefaultBag();
+ values.addAll((DataBag) b.get(0));
+
+ if (intermediate != null) {
+ values.add(intermediate);
+ }
+ intermediate = max(values,reporter);
+
+ }catch (ExecException ee){
+ IOException oughtToBeEE = new IOException();
+ oughtToBeEE.initCause(ee);
+ throw oughtToBeEE;
+ }
+ }
+
+ @Override
+ public Tuple getValue() {
+ return intermediate;
+ }
+
+ @Override
+ public void cleanup() {
+ intermediate = null;
+ }
+
}
Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java Fri Feb 24 08:19:42 2017
@@ -23,10 +23,13 @@ import java.util.Iterator;
import java.util.List;
import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.AVG;
+import org.apache.pig.builtin.BigDecimalAvg;
+import org.apache.pig.builtin.BigDecimalMax;
+import org.apache.pig.builtin.BigDecimalMin;
+import org.apache.pig.builtin.BigDecimalSum;
import org.apache.pig.builtin.COUNT;
import org.apache.pig.builtin.DoubleAvg;
import org.apache.pig.builtin.DoubleMax;
@@ -54,6 +57,7 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
/**
* Given an aggregate function, a bag, and possibly a window definition,
@@ -73,23 +77,27 @@ import org.apache.pig.impl.logicalLayer.
* <li>sum(int)</li>
* <li>sum(long)</li>
* <li>sum(bytearray)</li>
+ * <li>sum(bigdecimal)</li>
* <li>avg(double)</li>
* <li>avg(float)</li>
* <li>avg(long)</li>
* <li>avg(int)</li>
* <li>avg(bytearray)</li>
+ * <li>avg(bigdecimal)</li>
* <li>min(double)</li>
* <li>min(float)</li>
* <li>min(long)</li>
* <li>min(int)</li>
* <li>min(chararray)</li>
* <li>min(bytearray)</li>
+ * <li>min(bigdecimal)</li>
* <li>max(double)</li>
* <li>max(float)</li>
* <li>max(long)</li>
* <li>max(int)</li>
* <li>max(chararray)</li>
* <li>max(bytearray)</li>
+ * <li>max(bigdecimal)</li>
* <li>row_number</li>
* <li>first_value</li>
* <li>last_value</li>
@@ -153,7 +161,8 @@ import org.apache.pig.impl.logicalLayer.
* current row and 3 following) over T;</tt>
*
* <p>Over accepts a constructor argument specifying the name and type,
- * colon-separated, of its return schema.</p>
+ * colon-separated, of its return schema. If the argument option is 'true' use the inner-search,
+ * take the name and type of bag and return a schema with alias+'_over' and the same type</p>
*
* <p><pre>
* DEFINE IOver org.apache.pig.piggybank.evaluation.Over('state_rk:int');
@@ -188,12 +197,14 @@ public class Over extends EvalFunc<DataB
private Object[] udfArgs;
private byte returnType;
private String returnName;
+ private boolean searchInnerType;
public Over() {
initialized = false;
udfArgs = null;
func = null;
returnType = DataType.UNKNOWN;
+ searchInnerType = false;
}
public Over(String typespec) {
@@ -202,12 +213,16 @@ public class Over extends EvalFunc<DataB
String[] fn_tn = typespec.split(":", 2);
this.returnName = fn_tn[0];
this.returnType = DataType.findTypeByName(fn_tn[1]);
- } else {
+ } else if(Boolean.parseBoolean(typespec)) {
+ searchInnerType = Boolean.parseBoolean(typespec);
+ }else{
this.returnName = "result";
this.returnType = DataType.findTypeByName(typespec);
- }
+ }
}
+
+
@Override
public DataBag exec(Tuple input) throws IOException {
if (input == null || input.size() < 2) {
@@ -255,19 +270,42 @@ public class Over extends EvalFunc<DataB
@Override
public Schema outputSchema(Schema inputSch) {
try {
- if (returnType == DataType.UNKNOWN) {
+ FieldSchema field;
+
+ if (searchInnerType) {
+ field = new FieldSchema(inputSch.getField(0));
+ while (searchInnerType) {
+ if (field.schema != null
+ && field.schema.getFields().size() > 1) {
+ searchInnerType = false;
+ } else {
+ if (field.type == DataType.TUPLE
+ || field.type == DataType.BAG) {
+ field = new FieldSchema(field.schema.getField(0));
+ } else {
+ field.alias = field.alias + "_over";
+ searchInnerType = false;
+ }
+ }
+ }
+
+ searchInnerType = true;
+ } else if (returnType == DataType.UNKNOWN) {
return Schema.generateNestedSchema(DataType.BAG, DataType.NULL);
} else {
- Schema outputTupleSchema = new Schema(new Schema.FieldSchema(returnName, returnType));
- return new Schema(new Schema.FieldSchema(
- getSchemaName(this.getClass().getName().toLowerCase(), inputSch),
- outputTupleSchema,
- DataType.BAG));
+ field = new Schema.FieldSchema(returnName, returnType);
}
+
+ Schema outputTupleSchema = new Schema(field);
+ return new Schema(new Schema.FieldSchema(getSchemaName(this
+ .getClass().getName().toLowerCase(), inputSch),
+ outputTupleSchema, DataType.BAG));
+
} catch (FrontendException fe) {
throw new RuntimeException("Unable to create nested schema", fe);
}
}
+
private void init(Tuple input) throws IOException {
initialized = true;
@@ -329,6 +367,8 @@ public class Over extends EvalFunc<DataB
func = new LongSum();
} else if ("sum(bytearray)".equalsIgnoreCase(agg)) {
func = new SUM();
+ } else if ("sum(bigdecimal)".equalsIgnoreCase(agg)) {
+ func = new BigDecimalSum();
} else if ("avg(double)".equalsIgnoreCase(agg)) {
func = new DoubleAvg();
} else if ("avg(float)".equalsIgnoreCase(agg)) {
@@ -339,6 +379,8 @@ public class Over extends EvalFunc<DataB
func = new IntAvg();
} else if ("avg(bytearray)".equalsIgnoreCase(agg)) {
func = new AVG();
+ } else if ("avg(bigdecimal)".equalsIgnoreCase(agg)) {
+ func = new BigDecimalAvg();
} else if ("min(double)".equalsIgnoreCase(agg)) {
func = new DoubleMin();
} else if ("min(float)".equalsIgnoreCase(agg)) {
@@ -351,6 +393,8 @@ public class Over extends EvalFunc<DataB
func = new StringMin();
} else if ("min(bytearray)".equalsIgnoreCase(agg)) {
func = new MIN();
+ } else if ("min(bigdecimal)".equalsIgnoreCase(agg)) {
+ func = new BigDecimalMin();
} else if ("max(double)".equalsIgnoreCase(agg)) {
func = new DoubleMax();
} else if ("max(float)".equalsIgnoreCase(agg)) {
@@ -363,6 +407,8 @@ public class Over extends EvalFunc<DataB
func = new StringMax();
} else if ("max(bytearray)".equalsIgnoreCase(agg)) {
func = new MAX();
+ } else if ("max(bigdecimal)".equalsIgnoreCase(agg)) {
+ func = new BigDecimalMax();
} else if ("row_number".equalsIgnoreCase(agg)) {
func = new RowNumber();
} else if ("first_value".equalsIgnoreCase(agg)) {
Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelogparser/SearchEngineExtractor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelogparser/SearchEngineExtractor.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelogparser/SearchEngineExtractor.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachelogparser/SearchEngineExtractor.java Fri Feb 24 08:19:42 2017
@@ -363,6 +363,15 @@ public class SearchEngineExtractor exten
searchEngines.put("search.lycos.com", "Lycos");
searchEngines.put("search.msn.co.uk", "MSN UK");
searchEngines.put("search.msn.com", "MSN");
+ searchEngines.put("bing.com", "Bing");
+ searchEngines.put("ssl.bing.com", "Bing");
+ searchEngines.put("cn.bing.com", "Bing China");
+ searchEngines.put("br.bing.com", "Bing Brazil");
+ searchEngines.put("it.bing.com", "Bing Italy");
+ searchEngines.put("be.bing.com", "Bing Netherlands");
+ searchEngines.put("uk.bing.com", "Bing UK");
+ searchEngines.put("hk.bing.com", "Bing Hong Kong");
+ searchEngines.put("nz.bing.com", "Bing New Zeland");
searchEngines.put("search.myway.com", "MyWay");
searchEngines.put("search.mywebsearch.com", "My Web Search");
searchEngines.put("search.ntlworld.com", "NTLWorld");
Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/xml/XPath.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/xml/XPath.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/xml/XPath.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/xml/XPath.java Fri Feb 24 08:19:42 2017
@@ -16,8 +16,11 @@ package org.apache.pig.piggybank.evaluat
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import javax.xml.XMLConstants;
+import javax.xml.namespace.NamespaceContext;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.xpath.XPathFactory;
@@ -49,8 +52,7 @@ public class XPath extends EvalFunc<Stri
private static boolean cache = true;
private static boolean ignoreNamespace = true;
- public static final String EMPTY_STRING = "";
-
+
/**
* input should contain: 1) xml 2) xpath
* 3) optional cache xml doc flag
@@ -95,8 +97,13 @@ public class XPath extends EvalFunc<Stri
return null;
}
- if(input.size() > 2)
+ if(input.size() > 2) {
cache = (Boolean) input.get(2);
+ }
+
+ if (input.size() > 3) {
+ ignoreNamespace = (Boolean) input.get(3);
+ }
if (!cache || xpath == null || !xml.equals(this.xml)) {
final InputSource source = new InputSource(new StringReader(xml));
@@ -104,6 +111,7 @@ public class XPath extends EvalFunc<Stri
this.xml = xml; // track the xml for subsequent calls to this udf
final DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ dbf.setNamespaceAware(!ignoreNamespace);
final DocumentBuilder db = dbf.newDocumentBuilder();
this.document = db.parse(source);
@@ -112,14 +120,32 @@ public class XPath extends EvalFunc<Stri
this.xpath = xpathFactory.newXPath();
+ if (!ignoreNamespace) {
+ xpath.setNamespaceContext(new NamespaceContext() {
+ @Override
+ public String getNamespaceURI(String prefix) {
+ if (prefix.equals(XMLConstants.DEFAULT_NS_PREFIX)) {
+ return document.lookupNamespaceURI(null);
+ } else {
+ return document.lookupNamespaceURI(prefix);
+ }
+ }
+
+ @Override
+ public String getPrefix(String namespaceURI) {
+ return document.lookupPrefix(namespaceURI);
+ }
+
+ @Override
+ public Iterator getPrefixes(String namespaceURI) {
+ return null;
+ }
+ });
+ }
}
String xpathString = (String) input.get(1);
- if (ignoreNamespace) {
- xpathString = createNameSpaceIgnoreXpathString(xpathString);
- }
-
final String value = xpath.evaluate(xpathString, document);
return value;
@@ -165,34 +191,6 @@ public class XPath extends EvalFunc<Stri
}
return true;
}
-
-
- /**
- * Returns a new the xPathString by adding additional parameters
- * in the existing xPathString for ignoring the namespace during compilation.
- *
- * @param String xpathString
- * @return String modified xpathString
- */
- private String createNameSpaceIgnoreXpathString(final String xpathString) {
- final String QUERY_PREFIX = "//*";
- final String LOCAL_PREFIX = "[local-name()='";
- final String LOCAL_POSTFIX = "']";
- final String SPLITTER = "/";
-
- try {
- String xpathStringWithLocalName = EMPTY_STRING;
- String[] individualNodes = xpathString.split(SPLITTER);
-
- for (String node : individualNodes) {
- xpathStringWithLocalName = xpathStringWithLocalName.concat(QUERY_PREFIX + LOCAL_PREFIX + node
- + LOCAL_POSTFIX);
- }
- return xpathStringWithLocalName;
- } catch (Exception ex) {
- return xpathString;
- }
- }
/**
* Returns argument schemas of the UDF.
Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java Fri Feb 24 08:19:42 2017
@@ -580,7 +580,7 @@ public class CSVExcelStorage extends Pig
}
} else if (b == DOUBLE_QUOTE) {
// Does a double quote immediately follow?
- if ((i < recordLen-1) && (buf[i+1] == DOUBLE_QUOTE)) {
+ if ((i < recordLen-1) && (buf[i+1] == DOUBLE_QUOTE) && (fieldBuffer.position() != 0)) {
fieldBuffer.put(b);
nextTupleSkipChar = true;
continue;
Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/DBStorage.java Fri Feb 24 08:19:42 2017
@@ -91,6 +91,7 @@ public class DBStorage extends StoreFunc
/**
* Write the tuple to Database directly here.
*/
+ @Override
public void putNext(Tuple tuple) throws IOException {
int sqlPos = 1;
try {
@@ -373,4 +374,9 @@ public class DBStorage extends StoreFunc
p.setProperty(SCHEMA_SIGNATURE, s.toString());
}
+ @Override
+ public Boolean supportsParallelWriteToStoreLocation() {
+ return false;
+ }
+
}
Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/IndexedStorage.java Fri Feb 24 08:19:42 2017
@@ -60,7 +60,6 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.util.StorageUtil;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DataByteArray;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
/**
* <code>IndexedStorage</code> is a form of <code>PigStorage</code> that supports a
Modified: pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java Fri Feb 24 08:19:42 2017
@@ -16,7 +16,9 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.text.NumberFormat;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@@ -42,6 +44,9 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.StorageUtil;
+import org.apache.xml.utils.StringBufferPool;
+
+import com.google.common.base.Strings;
/**
* The UDF is useful for splitting the output data into a bunch of directories
@@ -73,13 +78,21 @@ import org.apache.pig.impl.util.StorageU
* If the output is compressed,then the sub directories and the output files will
* be having the extension. Say for example in the above case if bz2 is used one file
* will look like ;/my/home/output.bz2/a1.bz2/a1-0000.bz2
+ *
+ * Key field can also be a comma separated list of indices e.g. '0,1' - in this case
+ * storage will be multi-level:
+ * /my/home/output/a1/b1/a1-b1-0000
+ * /my/home/output/a1/b2/a1-b2-0000
+ * There is also an option to leave key values out of storage, see isRemoveKeys.
*/
public class MultiStorage extends StoreFunc {
+ private static final String KEYFIELD_DELIMETER = ",";
private Path outputPath; // User specified output Path
- private int splitFieldIndex = -1; // Index of the key field
+ private final List<Integer> splitFieldIndices= new ArrayList<Integer>(); // Indices of the key fields
private String fieldDel; // delimiter of the output record.
private Compression comp; // Compression type of output data.
+ private boolean isRemoveKeys = false;
// Compression types supported by this store
enum Compression {
@@ -95,9 +108,14 @@ public class MultiStorage extends StoreF
this(parentPathStr, splitFieldIndex, compression, "\\t");
}
+ public MultiStorage(String parentPathStr, String splitFieldIndex,
+ String compression, String fieldDel) {
+ this(parentPathStr, splitFieldIndex, compression, fieldDel, "false");
+ }
+
/**
* Constructor
- *
+ *
* @param parentPathStr
* Parent output dir path (this will be specified in store statement,
* so MultiStorage don't use this parameter in reality. However, we don't
@@ -108,18 +126,26 @@ public class MultiStorage extends StoreF
* 'bz2', 'bz', 'gz' or 'none'
* @param fieldDel
* Output record field delimiter.
+ * @param isRemoveKeys
+ * Removes key columns from result during write.
*/
public MultiStorage(String parentPathStr, String splitFieldIndex,
- String compression, String fieldDel) {
+ String compression, String fieldDel, String isRemoveKeys) {
+ this.isRemoveKeys = Boolean.parseBoolean(isRemoveKeys);
this.outputPath = new Path(parentPathStr);
- this.splitFieldIndex = Integer.parseInt(splitFieldIndex);
+
+ String[] splitFieldIndices = splitFieldIndex.split(KEYFIELD_DELIMETER);
+ for (String splitFieldIndexString : splitFieldIndices){
+ this.splitFieldIndices.add(Integer.parseInt(splitFieldIndexString));
+ }
+
this.fieldDel = fieldDel;
try {
this.comp = (compression == null) ? Compression.none : Compression
- .valueOf(compression.toLowerCase());
+ .valueOf(compression.toLowerCase());
} catch (IllegalArgumentException e) {
System.err.println("Exception when converting compression string: "
- + compression + " to enum. No compression will be used");
+ + compression + " to enum. No compression will be used");
this.comp = Compression.none;
}
}
@@ -127,22 +153,26 @@ public class MultiStorage extends StoreF
//--------------------------------------------------------------------------
// Implementation of StoreFunc
- private RecordWriter<String, Tuple> writer;
+ private RecordWriter<List<String>, Tuple> writer;
@Override
public void putNext(Tuple tuple) throws IOException {
- if (tuple.size() <= splitFieldIndex) {
- throw new IOException("split field index:" + this.splitFieldIndex
- + " >= tuple size:" + tuple.size());
+ for (int splitFieldIndex : this.splitFieldIndices) {
+ if (tuple.size() <= splitFieldIndex) {
+ throw new IOException("split field index:" + splitFieldIndex
+ + " >= tuple size:" + tuple.size());
+ }
}
- Object field = null;
- try {
- field = tuple.get(splitFieldIndex);
- } catch (ExecException exec) {
- throw new IOException(exec);
+ List<String> fields = new ArrayList<String>();
+ for (int splitFieldIndex : this.splitFieldIndices){
+ try {
+ fields.add(String.valueOf(tuple.get(splitFieldIndex)));
+ } catch (ExecException exec) {
+ throw new IOException(exec);
+ }
}
try {
- writer.write(String.valueOf(field), tuple);
+ writer.write(fields, tuple);
} catch (InterruptedException e) {
throw new IOException(e);
}
@@ -153,6 +183,9 @@ public class MultiStorage extends StoreF
public OutputFormat getOutputFormat() throws IOException {
MultiStorageOutputFormat format = new MultiStorageOutputFormat();
format.setKeyValueSeparator(fieldDel);
+ if (this.isRemoveKeys){
+ format.setSkipIndices(this.splitFieldIndices);
+ }
return format;
}
@@ -174,27 +207,33 @@ public class MultiStorage extends StoreF
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
}
}
-
+
+ @Override
+ public Boolean supportsParallelWriteToStoreLocation() {
+ return false;
+ }
+
//--------------------------------------------------------------------------
// Implementation of OutputFormat
public static class MultiStorageOutputFormat extends
- TextOutputFormat<String, Tuple> {
+ TextOutputFormat<List<String>, Tuple> {
private String keyValueSeparator = "\\t";
private byte fieldDel = '\t';
-
+ private List<Integer> skipIndices = null;
+
@Override
- public RecordWriter<String, Tuple>
+ public RecordWriter<List<String>, Tuple>
getRecordWriter(TaskAttemptContext context
) throws IOException, InterruptedException {
final TaskAttemptContext ctx = context;
- return new RecordWriter<String, Tuple>() {
+ return new RecordWriter<List<String>, Tuple>() {
- private Map<String, MyLineRecordWriter> storeMap =
- new HashMap<String, MyLineRecordWriter>();
+ private Map<List<String>, MyLineRecordWriter> storeMap =
+ new HashMap<List<String>, MyLineRecordWriter>();
private static final int BUFFER_SIZE = 1024;
@@ -202,7 +241,7 @@ public class MultiStorage extends StoreF
new ByteArrayOutputStream(BUFFER_SIZE);
@Override
- public void write(String key, Tuple val) throws IOException {
+ public void write(List<String> key, Tuple val) throws IOException {
int sz = val.size();
for (int i = 0; i < sz; i++) {
Object field;
@@ -212,9 +251,13 @@ public class MultiStorage extends StoreF
throw ee;
}
- StorageUtil.putField(mOut, field);
+ boolean skipCurrentField = skipIndices != null && skipIndices.contains(i);
- if (i != sz - 1) {
+ if (!skipCurrentField) {
+ StorageUtil.putField(mOut, field);
+ }
+
+ if (i != sz - 1 && !skipCurrentField) {
mOut.write(fieldDel);
}
}
@@ -231,17 +274,17 @@ public class MultiStorage extends StoreF
}
}
- private MyLineRecordWriter getStore(String fieldValue) throws IOException {
- MyLineRecordWriter store = storeMap.get(fieldValue);
+ private MyLineRecordWriter getStore(List<String> fieldValues) throws IOException {
+ MyLineRecordWriter store = storeMap.get(fieldValues);
if (store == null) {
- DataOutputStream os = createOutputStream(fieldValue);
+ DataOutputStream os = createOutputStream(fieldValues);
store = new MyLineRecordWriter(os, keyValueSeparator);
- storeMap.put(fieldValue, store);
+ storeMap.put(fieldValues, store);
}
return store;
}
- private DataOutputStream createOutputStream(String fieldValue) throws IOException {
+ private DataOutputStream createOutputStream(List<String> fieldValues) throws IOException {
Configuration conf = ctx.getConfiguration();
TaskID taskId = ctx.getTaskAttemptID().getTaskID();
@@ -259,7 +302,21 @@ public class MultiStorage extends StoreF
NumberFormat nf = NumberFormat.getInstance();
nf.setMinimumIntegerDigits(4);
- Path path = new Path(fieldValue+extension, fieldValue + '-'
+ StringBuffer pathStringBuffer = new StringBuffer();
+ for (String fieldValue : fieldValues){
+ String safeFieldValue = fieldValue.replaceAll("\\/","-");
+ pathStringBuffer.append(safeFieldValue);
+ pathStringBuffer.append("/");
+ }
+ pathStringBuffer.deleteCharAt(pathStringBuffer.length()-1);
+ String pathString = pathStringBuffer.toString();
+ String idString = pathString.replaceAll("\\/","-");
+
+ if (!Strings.isNullOrEmpty(extension)){
+ pathString = pathString.replaceAll("\\/",extension+"\\/");
+ }
+
+ Path path = new Path(pathString+extension, idString + '-'
+ nf.format(taskId.getId())+extension);
Path workOutputPath = ((FileOutputCommitter)getOutputCommitter(ctx)).getWorkPath();
Path file = new Path(workOutputPath, path);
@@ -279,8 +336,12 @@ public class MultiStorage extends StoreF
keyValueSeparator = sep;
fieldDel = StorageUtil.parseFieldDel(keyValueSeparator);
}
-
- //------------------------------------------------------------------------
+
+ public void setSkipIndices(List<Integer> skipIndices) {
+ this.skipIndices = skipIndices;
+ }
+
+ //------------------------------------------------------------------------
//
protected static class MyLineRecordWriter
Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java Fri Feb 24 08:19:42 2017
@@ -18,12 +18,11 @@
package org.apache.pig.piggybank.evaluation;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.math.BigDecimal;
import java.util.Iterator;
-import java.util.List;
import java.util.Random;
import org.apache.pig.backend.executionengine.ExecException;
@@ -34,8 +33,6 @@ import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-import org.junit.Before;
import org.junit.Test;
public class TestOver {
@@ -66,11 +63,25 @@ public class TestOver {
out = func.outputSchema(in);
assertEquals("{org.apache.pig.piggybank.evaluation.over_3: {result: double}}", out.toString());
+ // bigdecimal
+ func = new Over("BIGDECIMAL");
+ in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
+ out = func.outputSchema(in);
+ assertEquals("{org.apache.pig.piggybank.evaluation.over_4: {result: bigdecimal}}", out.toString());
+
// named
func = new Over("bob:chararray");
in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
out = func.outputSchema(in);
- assertEquals("{org.apache.pig.piggybank.evaluation.over_4: {bob: chararray}}", out.toString());
+ assertEquals("{org.apache.pig.piggybank.evaluation.over_5: {bob: chararray}}", out.toString());
+
+
+ // Search inner alias and type
+ func = new Over("true");
+ in = Schema.generateNestedSchema(DataType.BAG, DataType.BIGDECIMAL);
+ in.getField(0).schema.getField(0).alias="test";
+ out = func.outputSchema(in);
+ assertEquals("{org.apache.pig.piggybank.evaluation.over_6: {test_over: bigdecimal}}", out.toString());
}
@Test
@@ -397,6 +408,28 @@ public class TestOver {
assertEquals(new Long(10), to.get(0));
}
}
+
+ @Test
+ public void testSumBigDecimal() throws Exception {
+ Over func = new Over();
+ DataBag inbag = BagFactory.getInstance().newDefaultBag();
+ for (int i = 0; i < 10; i++) {
+ Tuple t = TupleFactory.getInstance().newTuple(1);
+ t.set(0, new BigDecimal(1));
+ inbag.add(t);
+ }
+ Tuple t = TupleFactory.getInstance().newTuple(4);
+ t.set(0, inbag);
+ t.set(1, "sum(bigdecimal)");
+ t.set(2, -1);
+ t.set(3, -1);
+ DataBag outbag = func.exec(t);
+ assertEquals(10, outbag.size());
+ for (Tuple to : outbag) {
+ assertEquals(1, to.size());
+ assertEquals(new BigDecimal(10), to.get(0));
+ }
+ }
@Test
public void testAvgDouble() throws Exception {
@@ -509,6 +542,29 @@ public class TestOver {
}
@Test
+ public void testAvgBigDecimal() throws Exception {
+ Over func = new Over();
+ DataBag inbag = BagFactory.getInstance().newDefaultBag();
+ for (int i = 0; i < 10; i++) {
+ Tuple t = TupleFactory.getInstance().newTuple(1);
+ t.set(0, new BigDecimal(i));
+ inbag.add(t);
+ }
+ Tuple t = TupleFactory.getInstance().newTuple(4);
+ t.set(0, inbag);
+ t.set(1, "avg(bigdecimal)");
+ t.set(2, -1);
+ t.set(3, -1);
+ DataBag outbag = func.exec(t);
+ assertEquals(10, outbag.size());
+ for (Tuple to : outbag) {
+ assertEquals(1, to.size());
+ assertEquals(new BigDecimal(4.5), to.get(0));
+ }
+ }
+
+
+ @Test
public void testMinDouble() throws Exception {
Over func = new Over();
DataBag inbag = BagFactory.getInstance().newDefaultBag();
@@ -627,6 +683,26 @@ public class TestOver {
assertEquals("0", to.get(0));
}
}
+
+ @Test
+ public void testMinBigDecimal() throws Exception {
+ Over func = new Over();
+ DataBag inbag = BagFactory.getInstance().newDefaultBag();
+ for (int i = 0; i < 10; i++) {
+ Tuple t = TupleFactory.getInstance().newTuple(1);
+ t.set(0, new BigDecimal(i));
+ inbag.add(t);
+ }
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, inbag);
+ t.set(1, "min(bigdecimal)");
+ DataBag outbag = func.exec(t);
+ assertEquals(10, outbag.size());
+ for (Tuple to : outbag) {
+ assertEquals(1, to.size());
+ assertEquals(new BigDecimal(0), to.get(0));
+ }
+ }
@Test
public void testMaxDouble() throws Exception {
@@ -754,6 +830,28 @@ public class TestOver {
assertEquals("9", to.get(0));
}
}
+
+ @Test
+ public void testMaxBigDecimal() throws Exception {
+ Over func = new Over();
+ DataBag inbag = BagFactory.getInstance().newDefaultBag();
+ for (int i = 0; i < 10; i++) {
+ Tuple t = TupleFactory.getInstance().newTuple(1);
+ t.set(0, new BigDecimal(i));
+ inbag.add(t);
+ }
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, inbag);
+ t.set(1, "max(bigdecimal)");
+ DataBag outbag = func.exec(t);
+ assertEquals(10, outbag.size());
+ int count = 0;
+ for (Tuple to : outbag) {
+ assertEquals(1, to.size());
+ assertEquals(new BigDecimal(count++), to.get(0));
+ }
+ }
+
@Test
public void testRowNumber() throws Exception {
Added: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestMaxTupleBy1stField.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestMaxTupleBy1stField.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestMaxTupleBy1stField.java (added)
+++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestMaxTupleBy1stField.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,95 @@
+package org.apache.pig.piggybank.test.evaluation;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import com.google.common.collect.Lists;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.piggybank.evaluation.MaxTupleBy1stField;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestMaxTupleBy1stField {
+
+ private static List<Tuple> inputTuples = new ArrayList<>();
+ private static final TupleFactory TUPLE_FACTORY = TupleFactory.getInstance();
+ private static final BagFactory BAG_FACTORY = BagFactory.getInstance();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(0L, "Fruit", "orange", 21F)));
+ inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(1L, "Fruit", "apple", 9.9F)));
+ inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(2L, "Vegetable", "paprika", 30F)));
+ inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(3L, "Fruit", "blueberry", 40F)));
+ inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(4L, "Vegetable", "carrot", 50F)));
+ inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(5L, "Fruit", "blueberry", 41F)));
+ inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(6L, "Vegetable", "paprika", 31F)));
+ inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(7L, "Fruit", "orange", 20.5F)));
+ inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(8L, "Fruit", "apple", 10.1F)));
+ inputTuples.add(TUPLE_FACTORY.newTuple(Lists.newArrayList(9L, "Fruit", "apple", 10.2F)));
+ }
+
+
+ @Test
+ public void testExecFunc() throws Exception {
+ MaxTupleBy1stField udf = new MaxTupleBy1stField();
+ Tuple inputTuple = createTupleFromInputList(0,inputTuples.size());
+
+ Tuple result = udf.exec(inputTuple);
+ Assert.assertEquals("apple", result.get(2));
+ Assert.assertEquals(10.2F, (Float) result.get(3), 1E-8);
+ }
+
+ @Test
+ public void testAccumulator() throws Exception {
+ MaxTupleBy1stField udf = new MaxTupleBy1stField();
+
+ Tuple inputTuple = createTupleFromInputList(0, 3);
+ udf.accumulate(inputTuple);
+ Tuple result = udf.getValue();
+ Assert.assertEquals("paprika", result.get(2));
+ Assert.assertEquals(30F, (Float) result.get(3), 1E-6);
+
+ inputTuple = createTupleFromInputList(3, 6);
+ udf.accumulate(inputTuple);
+ result = udf.getValue();
+ Assert.assertEquals("apple", result.get(2));
+ Assert.assertEquals(10.1F, (Float) result.get(3), 1E-6);
+
+ udf.cleanup();
+ Assert.assertEquals(null,udf.getValue());
+ }
+
+ private static Tuple createTupleFromInputList(int offset, int length) {
+ DataBag inputBag = BAG_FACTORY.newDefaultBag();
+ for (int i = offset; i < offset+length; ++i) {
+ inputBag.add(inputTuples.get(i));
+ }
+ Tuple inputTuple = TUPLE_FACTORY.newTuple();
+ inputTuple.append(inputBag);
+ return inputTuple;
+ }
+
+}
Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/XPathTest.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/XPathTest.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/XPathTest.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/xml/XPathTest.java Fri Feb 24 08:19:42 2017
@@ -151,6 +151,27 @@ public class XPathTest {
}
@Test
+ public void testExecTupleWithDontIgnoreNamespace() throws Exception {
+
+ final XPath xpath = new XPath();
+
+ final Tuple tuple = mock(Tuple.class);
+
+ when(tuple.get(0)).thenReturn("<?xml version='1.0'?>\n" +
+ "<foo:document xmlns:foo=\"http://apache.org/foo\" xmlns:bar=\"http://apache.org/bar\">" +
+ "<bar:element>MyBar</bar:element>" +
+ "</foo:document>");
+
+ when(tuple.size()).thenReturn(4);
+ when(tuple.get(2)).thenReturn(true);
+ when(tuple.get(3)).thenReturn(false);
+
+ when(tuple.get(1)).thenReturn("/foo:document/bar:element");
+ assertEquals("MyBar", xpath.exec(tuple));
+
+ }
+
+ @Test
public void testExecTupleWithElementNodeWithComplexNameSpace() throws Exception {
final XPath xpath = new XPath();
@@ -210,7 +231,31 @@ public class XPathTest {
assertEquals("4 stars3.5 stars4 stars4.2 stars3.5 stars", xpath.exec(tuple));
}
-
+
+ @Test
+ public void testFunctionInXPath() throws Exception {
+
+ final XPath xpath = new XPath();
+
+ final Tuple tuple = mock(Tuple.class);
+
+ when(tuple.get(0)).thenReturn("<Aa name=\"test1\">" +
+ "<Bb Cc=\"1\"/>" +
+ "<Bb Cc=\"1\"/>" +
+ "<Bb Cc=\"1\"/>" +
+ "<Bb Cc=\"1\"/>" +
+ "<Dd>test2</Dd>" +
+ "</Aa>");
+
+ when(tuple.size()).thenReturn(4);
+ when(tuple.get(1)).thenReturn("sum(Aa/Bb/@Cc)");
+ when(tuple.get(2)).thenReturn(true);
+ when(tuple.get(3)).thenReturn(true);
+
+ assertEquals("4", xpath.exec(tuple));
+
+ }
+
@Ignore //--optional test
@Test
public void testCacheBenefit() throws Exception{
Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java Fri Feb 24 08:19:42 2017
@@ -218,7 +218,7 @@ public class TestCSVExcelStorage {
Util.registerMultiLineQuery(pig, script);
Iterator<Tuple> it = pig.openIterator("a");
Assert.assertEquals(Util.createTuple(new String[] {"foo,\"bar\",baz"}), it.next());
- Assert.assertEquals(Util.createTuple(new String[] {"\"\"\"\""}), it.next());
+ Assert.assertEquals(Util.createTuple(new String[] {"\"\"\""}), it.next());
}
// Handle newlines in fields
Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLogFormatLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLogFormatLoader.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLogFormatLoader.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestLogFormatLoader.java Fri Feb 24 08:19:42 2017
@@ -109,7 +109,7 @@ public class TestLogFormatLoader {
Tuple actual = out.get(0);
Tuple expected = tuple(
"2001:980:91c0:1:8d31:a232:25e5:85d",
- "[05/Sep/2010:11:27:50 +0200]",
+ "05/Sep/2010:11:27:50 +0200",
"koken-pannen_303_hs-koken-pannen-afj-120601_B3_product_1_9200000002876066",
map(
"promo" , "koken-pannen_303_hs-koken-pannen-afj-120601_B3_product_1_9200000002876066",
Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java Fri Feb 24 08:19:42 2017
@@ -18,34 +18,41 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.test.MiniCluster;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.test.MiniGenericCluster;
import org.apache.pig.test.Util;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
-import junit.framework.Assert;
-import junit.framework.TestCase;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
-public class TestMultiStorage extends TestCase {
+public class TestMultiStorage {
private static final String INPUT_FILE = "MultiStorageInput.txt";
private PigServer pigServer;
private PigServer pigServerLocal;
- private MiniCluster cluster = MiniCluster.buildCluster();
+ private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
- public TestMultiStorage() throws ExecException, IOException {
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
- pigServerLocal = new PigServer(ExecType.LOCAL);
+ public TestMultiStorage() throws Exception {
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ pigServerLocal = new PigServer(Util.getLocalTestMode());
}
public static final PathFilter hiddenPathFilter = new PathFilter() {
@@ -74,59 +81,83 @@ public class TestMultiStorage extends Te
Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
}
- @Override
@Before
public void setUp() throws Exception {
createFile();
FileSystem fs = FileSystem.getLocal(new Configuration());
Path localOut = new Path("local-out");
- Path dummy = new Path("dummy");
if (fs.exists(localOut)) {
fs.delete(localOut, true);
}
- if (fs.exists(dummy)) {
- fs.delete(dummy, true);
- }
}
- @Override
@After
public void tearDown() throws Exception {
new File(INPUT_FILE).delete();
Util.deleteFile(cluster, INPUT_FILE);
+ }
+
+ @AfterClass
+ public static void shutdown() {
cluster.shutDown();
}
enum Mode {
local, cluster
- };
+ }
@Test
public void testMultiStorage() throws IOException {
final String LOAD = "A = LOAD '" + INPUT_FILE + "' as (id, name, n);";
final String MULTI_STORE_CLUSTER = "STORE A INTO 'mr-out' USING "
+ "org.apache.pig.piggybank.storage.MultiStorage('mr-out', '1');";
- final String MULTI_STORE_LOCAL = "STORE A INTO 'dummy' USING "
+ final String MULTI_STORE_LOCAL = "STORE A INTO 'local-out' USING "
+ "org.apache.pig.piggybank.storage.MultiStorage('local-out', '1');";
System.out.print("Testing in LOCAL mode: ...");
- //testMultiStorage(Mode.local, "local-out", LOAD, MULTI_STORE_LOCAL);
+ testMultiStorage(Mode.local, "local-out", LOAD, MULTI_STORE_LOCAL);
System.out.println("Succeeded!");
-
+
System.out.print("Testing in CLUSTER mode: ...");
testMultiStorage( Mode.cluster, "mr-out", LOAD, MULTI_STORE_CLUSTER);
System.out.println("Succeeded!");
-
-
}
- /**
- * The actual method that run the test in local or cluster mode.
- *
- * @param pigServer
- * @param mode
- * @param queries
- * @throws IOException
+ @Test
+ public void testOutputStats() throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+
+ pigServer.setBatchOn();
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, n);");
+ pigServer.registerQuery("B = FILTER A BY name == 'apple';");
+ pigServer.registerQuery("STORE A INTO 'out1' USING org.apache.pig.piggybank.storage.MultiStorage('out1', '1');"); //153 bytes
+ pigServer.registerQuery("STORE B INTO 'out2' USING org.apache.pig.piggybank.storage.MultiStorage('out2', '1');"); // 45 bytes
+
+ ExecJob job = pigServer.executeBatch().get(0);
+
+ PigStats stats = job.getStatistics();
+ PigStats.JobGraph jobGraph = stats.getJobGraph();
+ JobStats jobStats = (JobStats) jobGraph.getSinks().get(0);
+ Map<String, Long> multiStoreCounters = jobStats.getMultiStoreCounters();
+ List<OutputStats> outputStats = SimplePigStats.get().getOutputStats();
+ OutputStats outputStats1 = "out1".equals(outputStats.get(0).getName()) ? outputStats.get(0) : outputStats.get(1);
+ OutputStats outputStats2 = "out2".equals(outputStats.get(0).getName()) ? outputStats.get(0) : outputStats.get(1);
+
+ assertEquals(153 + 45, stats.getBytesWritten());
+ assertEquals(2, outputStats.size()); // 2 split conditions
+ assertEquals(153, outputStats1.getBytes());
+ assertEquals(45, outputStats2.getBytes());
+ assertEquals(9, outputStats1.getRecords());
+ assertEquals(3, outputStats2.getRecords());
+ assertEquals(3L, multiStoreCounters.get("Output records in _1_out2").longValue());
+ assertEquals(9L, multiStoreCounters.get("Output records in _0_out1").longValue());
+
+ fs.delete(new Path("out1"), true);
+ fs.delete(new Path("out2"), true);
+ }
+
+ /**
+ * The actual method that run the test in local or cluster mode.
*/
private void testMultiStorage( Mode mode, String outPath,
String... queries) throws IOException {
@@ -142,42 +173,38 @@ public class TestMultiStorage extends Te
/**
* Test if records are split into directories corresponding to split field
* values
- *
- * @param mode
- * @throws IOException
*/
private void verifyResults(Mode mode, String outPath) throws IOException {
FileSystem fs = (Mode.local == mode ? FileSystem
.getLocal(new Configuration()) : cluster.getFileSystem());
Path output = new Path(outPath);
- Assert.assertTrue("Output dir does not exists!", fs.exists(output)
+ assertTrue("Output dir does not exists!", fs.exists(output)
&& fs.getFileStatus(output).isDir());
Path[] paths = FileUtil.stat2Paths(fs.listStatus(output, hiddenPathFilter));
- Assert.assertTrue("Split field dirs not found!", paths != null);
+ assertTrue("Split field dirs not found!", paths != null);
for (Path path : paths) {
String splitField = path.getName();
Path[] files = FileUtil.stat2Paths(fs.listStatus(path, hiddenPathFilter));
- Assert.assertTrue("No files found for path: " + path.toUri().getPath(),
+ assertTrue("No files found for path: " + path.toUri().getPath(),
files != null);
for (Path filePath : files) {
- Assert.assertTrue("This shouldn't be a directory", fs.isFile(filePath));
-
+ assertTrue("This shouldn't be a directory", fs.isFile(filePath));
BufferedReader reader = new BufferedReader(new InputStreamReader(fs
.open(filePath)));
String line = "";
int count = 0;
while ((line = reader.readLine()) != null) {
String[] fields = line.split("\\t");
- Assert.assertEquals(fields.length, 3);
- Assert.assertEquals("Unexpected field value in the output record",
+ assertEquals(fields.length, 3);
+ assertEquals("Unexpected field value in the output record",
splitField, fields[1]);
count++;
System.out.println("field: " + fields[1]);
- }
+ }
reader.close();
- Assert.assertEquals(count, 3);
+ assertEquals(count, 3);
}
}
}
Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java Fri Feb 24 08:19:42 2017
@@ -21,11 +21,14 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import junit.framework.TestCase;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.BZip2Codec;
@@ -37,6 +40,10 @@ import org.apache.pig.backend.executione
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.test.Util;
+import com.google.common.collect.Sets;
+
+import org.junit.Assert;
+
public class TestMultiStorageCompression extends TestCase {
private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)";
@@ -59,8 +66,8 @@ public class TestMultiStorageCompression
filesToDelete.add(outputPath);
try {
- runQuery(outputPath, type);
- verifyResults(type, filesToDelete, outputPath);
+ runQuery(outputPath, "0", type);
+ verifyResults(type, outputPath);
} finally {
cleanUpDirs(filesToDelete);
}
@@ -77,22 +84,22 @@ public class TestMultiStorageCompression
filesToDelete.add(outputPath);
try {
- runQuery(outputPath, type);
- verifyResults(type, filesToDelete, outputPath);
+ runQuery(outputPath, "0", type);
+ verifyResults(type, outputPath);
} finally {
cleanUpDirs(filesToDelete);
}
}
- private void cleanUpDirs(List<String> filesToDelete) {
+ private void cleanUpDirs(List<String> filesToDelete) throws IOException {
// Delete files recursively
Collections.reverse(filesToDelete);
for (String string : filesToDelete)
- new File(string).delete();
+ FileUtils.deleteDirectory(new File(string));
}
- private void verifyResults(String type, List<String> filesToDelete,
+ private void verifyResults(String type,
String outputPath) throws IOException, FileNotFoundException {
// Verify the output
File outputDir = new File(outputPath);
@@ -114,12 +121,10 @@ public class TestMultiStorageCompression
continue;
String topFolder = outputPath + File.separator + indexFolder;
File indexFolderFile = new File(topFolder);
- filesToDelete.add(topFolder);
String[] list = indexFolderFile.list();
for (String outputFile : list) {
String file = topFolder + File.separator + outputFile;
- filesToDelete.add(file);
// Skip off any file starting with .
if (outputFile.startsWith("."))
@@ -159,7 +164,7 @@ public class TestMultiStorageCompression
}
}
- private void runQuery(String outputPath, String compressionType)
+ private void runQuery(String outputPath, String keyColIndices, String compressionType)
throws Exception, ExecException, IOException, FrontendException {
// create a data file
@@ -172,7 +177,7 @@ public class TestMultiStorageCompression
String query2 = "STORE A INTO '" + Util.encodeEscape(outputPath)
+ "' USING org.apache.pig.piggybank.storage.MultiStorage" + "('"
- + Util.encodeEscape(outputPath) + "','0', '" + compressionType + "', '\\t');";
+ + Util.encodeEscape(outputPath) + "','"+keyColIndices+"', '" + compressionType + "', '\\t');";
// Run Pig
pig.setBatchOn();
@@ -182,5 +187,32 @@ public class TestMultiStorageCompression
pig.executeBatch();
}
+ public void testMultiStorageShouldSupportMultiLevelAndGz() throws Exception {
+ String type = "gz";
+ String outputDir = "output001.multi." + type;
+ List<String> filesToDelete = new ArrayList<String>();
+
+ String tmpDir = System.getProperty("java.io.tmpdir");
+ String outputPath = tmpDir + File.separator + outputDir;
+
+ filesToDelete.add(outputPath);
+ try {
+ runQuery(outputPath, "1,0", type);
+ Collection<File> fileList = FileUtils.listFiles(new File(outputPath),null,true);
+ Set<String> expectedPaths = Sets.newHashSet( "output001.multi.gz/a.gz/f1.gz/a-f1-0,000.gz",
+ "output001.multi.gz/b.gz/f2.gz/b-f2-0,000.gz",
+ "output001.multi.gz/c.gz/f3.gz/c-f3-0,000.gz",
+ "output001.multi.gz/d.gz/f4.gz/d-f4-0,000.gz");
+ for (File file : fileList){
+ String foundPath = file.getAbsolutePath().substring(file.getAbsolutePath().indexOf(outputDir));
+ if (expectedPaths.contains(foundPath)){
+ expectedPaths.remove(foundPath);
+ }
+ }
+ Assert.assertTrue(expectedPaths.isEmpty());
+ } finally {
+ cleanUpDirs(filesToDelete);
+ }
+ }
}
Added: pig/branches/spark/dev-support/docker/Dockerfile
URL: http://svn.apache.org/viewvc/pig/branches/spark/dev-support/docker/Dockerfile?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/dev-support/docker/Dockerfile (added)
+++ pig/branches/spark/dev-support/docker/Dockerfile Fri Feb 24 08:19:42 2017
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Dockerfile for installing the necessary dependencies for building Apache Pig.
+# See BUILDING.md.
+
+FROM ubuntu:trusty
+
+# Define working directory.
+WORKDIR /root
+
+RUN apt-get update
+
+# Install dependencies from packages
+RUN sed -i 's/# \(.*multiverse$\)/\1/g' /etc/apt/sources.list && \
+ apt-get install -y build-essential && \
+ apt-get install -y software-properties-common && \
+ apt-get install --no-install-recommends -y \
+ git subversion \
+ byobu htop man unzip vim \
+ cabal-install \
+ curl wget \
+ openjdk-7-jdk \
+ ant ant-contrib ant-optional make maven \
+ cmake gcc g++ protobuf-compiler \
+ build-essential libtool \
+ zlib1g-dev pkg-config libssl-dev \
+ snappy libsnappy-dev \
+ bzip2 libbz2-dev \
+ libjansson-dev \
+ fuse libfuse-dev \
+ libcurl4-openssl-dev \
+ python python2.7 && \
+ rm -rf /var/lib/apt/lists/*
+
+# Define commonly used JAVA_HOME variable
+ENV JAVA_HOME /usr/lib/jvm/java-7-openjdk-amd64
+
+# Fixing the Apache commons / Maven dependency problem under Ubuntu:
+# See http://wiki.apache.org/commons/VfsProblems
+RUN cd /usr/share/maven/lib && ln -s ../../java/commons-lang.jar .
+
+# Avoid out of memory errors in builds
+ENV MAVEN_OPTS -Xms256m -Xmx512m
+
+# Install findbugs
+RUN mkdir -p /opt/findbugs && \
+ wget http://sourceforge.net/projects/findbugs/files/findbugs/3.0.1/findbugs-noUpdateChecks-3.0.1.tar.gz/download \
+ -O /opt/findbugs.tar.gz && \
+ tar xzf /opt/findbugs.tar.gz --strip-components 1 -C /opt/findbugs
+ENV FINDBUGS_HOME /opt/findbugs
+
+# Install Forrest in /usr/local/apache-forrest
+# Screenscrape the download page for a local mirror URL
+RUN cd /usr/local/ && \
+ curl https://forrest.apache.org/mirrors.cgi | \
+ fgrep href | fgrep apache-forrest-0.9 | \
+ sed 's@^.*"\(http[^"]*apache-forrest-[^"]*.tar.gz\)".*@\1@' | \
+ xargs -n1 -r wget
+
+# Unpack Apache Forrest
+RUN cd /usr/local/ && \
+ tar xzf apache-forrest-0.9-sources.tar.gz && \
+ tar xzf apache-forrest-0.9-dependencies.tar.gz && \
+ mv apache-forrest-0.9 apache-forrest
+RUN cd /usr/local/apache-forrest/main && ./build.sh
+
+# The solution for https://issues.apache.org/jira/browse/PIG-3906
+RUN mkdir -p /usr/local/apache-forrest/plugins && chmod a+rwX -R /usr/local/apache-forrest/plugins
+RUN mkdir -p /usr/local/apache-forrest/build/plugins && chmod a+rwX -R /usr/local/apache-forrest/build/plugins
+
+# Configure where forrest can be found
+RUN echo 'forrest.home=/usr/local/apache-forrest' > build.properties
+ENV FORREST_HOME /usr/local/apache-forrest
+
+# Add a welcome message and environment checks.
+ADD build_env_checks.sh /root/build_env_checks.sh
+RUN chmod 755 /root/build_env_checks.sh
+ADD configure-for-user.sh /root/configure-for-user.sh
+RUN chmod 755 /root/configure-for-user.sh
+RUN echo '~/build_env_checks.sh' >> /root/.bashrc
Added: pig/branches/spark/dev-support/docker/build_env_checks.sh
URL: http://svn.apache.org/viewvc/pig/branches/spark/dev-support/docker/build_env_checks.sh?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/dev-support/docker/build_env_checks.sh (added)
+++ pig/branches/spark/dev-support/docker/build_env_checks.sh Fri Feb 24 08:19:42 2017
@@ -0,0 +1,120 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# -------------------------------------------------------
+function showWelcome {
+
+# http://patorjk.com/software/taag/#p=display&f=Doom&t=Pig%20Builder
+cat <<Welcome-message
+
+______ _ ______ _ _ _
+| ___ (_) | ___ \\ (_) | | |
+| |_/ /_ __ _ | |_/ /_ _ _| | __| | ___ _ __
+| __/| |/ _\` | | ___ \\ | | | | |/ _\` |/ _ \\ '__|
+| | | | (_| | | |_/ / |_| | | | (_| | __/ |_
+\\_| |_|\\__, | \\____/ \\__,_|_|_|\\__,_|\\___|_|
+ __/ |
+ |___/
+
+This is the standard Apache Pig Developer build environment.
+This has all the right tools installed required to build
+Pig from source.
+
+Welcome-message
+}
+
+# -------------------------------------------------------
+
+function showAbort {
+ cat <<Abort-message
+
+ ___ _ _ _
+ / _ \\| | | | (_)
+/ /_\\ \\ |__ ___ _ __| |_ _ _ __ __ _
+| _ | '_ \\ / _ \\| '__| __| | '_ \\ / _\` |
+| | | | |_) | (_) | | | |_| | | | | (_| |
+\\_| |_/_.__/ \\___/|_| \\__|_|_| |_|\\__, |
+ __/ |
+ |___/
+
+Abort-message
+}
+
+# -------------------------------------------------------
+
+function failIfUserIsRoot {
+ if [ "$(id -u)" -eq "0" ]; # If you are root then something went wrong.
+ then
+ cat <<End-of-message
+
+Apparently you are inside this docker container as the user root.
+Putting it simply:
+
+ This should not occur.
+
+Known possible causes of this are:
+1) Running this script as the root user ( Just don't )
+2) Running an old docker version ( upgrade to 1.4.1 or higher )
+
+End-of-message
+
+ showAbort
+
+ logout
+
+ fi
+}
+
+# -------------------------------------------------------
+
+# Configurable low water mark in GiB
+MINIMAL_MEMORY_GiB=2
+
+function warnIfLowMemory {
+ MINIMAL_MEMORY=$((MINIMAL_MEMORY_GiB*1024*1024)) # Convert to KiB
+ INSTALLED_MEMORY=$(fgrep MemTotal /proc/meminfo | awk '{print $2}')
+ if [ $((INSTALLED_MEMORY)) -le $((MINIMAL_MEMORY)) ];
+ then
+ cat <<End-of-message
+
+ _ ___ ___
+| | | \\/ |
+| | _____ __ | . . | ___ _ __ ___ ___ _ __ _ _
+| | / _ \\ \\ /\\ / / | |\\/| |/ _ \\ '_ \` _ \\ / _ \\| '__| | | |
+| |___| (_) \\ V V / | | | | __/ | | | | | (_) | | | |_| |
+\\_____/\\___/ \\_/\\_/ \\_| |_/\\___|_| |_| |_|\\___/|_| \\__, |
+ __/ |
+ |___/
+
+Your system is running on very little memory.
+This means it may work but it wil most likely be slower than needed.
+
+If you are running this via boot2docker you can simply increase
+the available memory to atleast ${MINIMAL_MEMORY_GiB} GiB (you have $((INSTALLED_MEMORY/(1024*1024))) GiB )
+
+End-of-message
+ fi
+}
+
+# -------------------------------------------------------
+
+showWelcome
+warnIfLowMemory
+failIfUserIsRoot
+
+# -------------------------------------------------------
Added: pig/branches/spark/dev-support/docker/configure-for-user.sh
URL: http://svn.apache.org/viewvc/pig/branches/spark/dev-support/docker/configure-for-user.sh?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/dev-support/docker/configure-for-user.sh (added)
+++ pig/branches/spark/dev-support/docker/configure-for-user.sh Fri Feb 24 08:19:42 2017
@@ -0,0 +1,40 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This script is used to tweak the environment at the moment we know
+# the real username of the person using this.
+# By making this script a part of the image we can extend and update
+# it to fit future needs more easily.
+
+# Native Linux (direct or via sudo)
+USER_NAME=$1
+USER_ID=$2
+GROUP_ID=$3
+
+groupadd --non-unique -g ${GROUP_ID} ${USER_NAME}
+useradd -g ${GROUP_ID} -u ${USER_ID} -k /root -m ${USER_NAME}
+echo "export HOME=/home/${USER_NAME}" >> ~/.bashrc
+echo "export USER=${USER_NAME}" >> ~/.bashrc
+
+VBOXSF_GROUP_LINE=$4
+if [ -n ${VBOXSF_GROUP_LINE} ];
+then
+ echo ${VBOXSF_GROUP_LINE} >> /etc/group
+ usermod -aG vboxsf ${USER_NAME}
+fi
+
+echo "${USER_NAME} ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers.d/${USER_NAME}