You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/01/07 00:11:38 UTC
svn commit: r1777738 [2/2] - in /pig/trunk: ./ bin/ contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ ivy/
shims/src/hadoop2/ shims/src/hadoop2/org/ shims/src/hadoop2/org/apache/
shims/src/hadoop2/org/apache...
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Sat Jan 7 00:11:37 2017
@@ -267,7 +267,7 @@ public class PigInputFormat extends Inpu
jobcontext.getJobID()));
List<InputSplit> oneInputPigSplits = getPigSplits(
oneInputSplits, i, inpTargets.get(i),
- HadoopShims.getDefaultBlockSize(fs, isFsPath? path: fs.getWorkingDirectory()),
+ fs.getDefaultBlockSize(isFsPath? path: fs.getWorkingDirectory()),
combinable, confClone);
splits.addAll(oneInputPigSplits);
} catch (ExecException ee) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Sat Jan 7 00:11:37 2017
@@ -18,7 +18,6 @@
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.io.IOException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
@@ -156,12 +155,7 @@ public class PigOutputCommitter extends
for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
if (mapCommitter.first!=null) {
try {
- // Use reflection, Hadoop 1.x line does not have such method
- Method m = mapCommitter.first.getClass().getMethod("isRecoverySupported");
- allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
- && (Boolean)m.invoke(mapCommitter.first);
- } catch (NoSuchMethodException e) {
- allOutputCommitterSupportRecovery = false;
+ allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && mapCommitter.first.isRecoverySupported();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -173,12 +167,7 @@ public class PigOutputCommitter extends
reduceOutputCommitters) {
if (reduceCommitter.first!=null) {
try {
- // Use reflection, Hadoop 1.x line does not have such method
- Method m = reduceCommitter.first.getClass().getMethod("isRecoverySupported");
- allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
- && (Boolean)m.invoke(reduceCommitter.first);
- } catch (NoSuchMethodException e) {
- allOutputCommitterSupportRecovery = false;
+ allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && reduceCommitter.first.isRecoverySupported();
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -197,10 +186,7 @@ public class PigOutputCommitter extends
mapCommitter.second);
try {
// Use reflection, Hadoop 1.x line does not have such method
- Method m = mapCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
- m.invoke(mapCommitter.first, updatedContext);
- } catch (NoSuchMethodException e) {
- // We are using Hadoop 1.x, ignore
+ mapCommitter.first.recoverTask(updatedContext);
} catch (Exception e) {
throw new IOException(e);
}
@@ -212,11 +198,7 @@ public class PigOutputCommitter extends
TaskAttemptContext updatedContext = setUpContext(context,
reduceCommitter.second);
try {
- // Use reflection, Hadoop 1.x line does not have such method
- Method m = reduceCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
- m.invoke(reduceCommitter.first, updatedContext);
- } catch (NoSuchMethodException e) {
- // We are using Hadoop 1.x, ignore
+ reduceCommitter.first.recoverTask(updatedContext);
} catch (Exception e) {
throw new IOException(e);
}
@@ -256,10 +238,7 @@ public class PigOutputCommitter extends
mapCommitter.second);
// PIG-2642 promote files before calling storeCleanup/storeSchema
try {
- // Use reflection, 20.2 does not have such method
- Method m = mapCommitter.first.getClass().getMethod("commitJob", JobContext.class);
- m.setAccessible(true);
- m.invoke(mapCommitter.first, updatedContext);
+ mapCommitter.first.commitJob(updatedContext);
} catch (Exception e) {
throw new IOException(e);
}
@@ -273,10 +252,7 @@ public class PigOutputCommitter extends
reduceCommitter.second);
// PIG-2642 promote files before calling storeCleanup/storeSchema
try {
- // Use reflection, 20.2 does not have such method
- Method m = reduceCommitter.first.getClass().getMethod("commitJob", JobContext.class);
- m.setAccessible(true);
- m.invoke(reduceCommitter.first, updatedContext);
+ reduceCommitter.first.commitJob(updatedContext);
} catch (Exception e) {
throw new IOException(e);
}
@@ -293,10 +269,7 @@ public class PigOutputCommitter extends
JobContext updatedContext = setUpContext(context,
mapCommitter.second);
try {
- // Use reflection, 20.2 does not have such method
- Method m = mapCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
- m.setAccessible(true);
- m.invoke(mapCommitter.first, updatedContext, state);
+ mapCommitter.first.abortJob(updatedContext, state);
} catch (Exception e) {
throw new IOException(e);
}
@@ -309,10 +282,7 @@ public class PigOutputCommitter extends
JobContext updatedContext = setUpContext(context,
reduceCommitter.second);
try {
- // Use reflection, 20.2 does not have such method
- Method m = reduceCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
- m.setAccessible(true);
- m.invoke(reduceCommitter.first, updatedContext, state);
+ reduceCommitter.first.abortJob(updatedContext, state);
} catch (Exception e) {
throw new IOException(e);
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Sat Jan 7 00:11:37 2017
@@ -19,7 +19,6 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
import java.io.IOException;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -43,6 +42,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -88,7 +88,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
@@ -285,7 +284,7 @@ public class TezDagBuilder extends TezOp
try {
fs = FileSystem.get(globalConf);
- intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs, FileLocalizer.getTemporaryResourcePath(pc));
+ intermediateTaskInputSize = fs.getDefaultBlockSize(FileLocalizer.getTemporaryResourcePath(pc));
} catch (Exception e) {
log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e);
intermediateTaskInputSize = 134217728L;
@@ -1428,22 +1427,12 @@ public class TezDagBuilder extends TezOp
private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
// the OutputFormat we report to Hadoop is always PigOutputFormat which
- // can be wrapped with LazyOutputFormat provided if it is supported by
- // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+ // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set
if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
- try {
- Class<?> clazz = PigContext
- .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
- Method method = clazz.getMethod("setOutputFormatClass",
- org.apache.hadoop.mapreduce.Job.class, Class.class);
- method.invoke(null, job, PigOutputFormatTez.class);
- } catch (Exception e) {
- job.setOutputFormatClass(PigOutputFormatTez.class);
- log.warn(PigConfiguration.PIG_OUTPUT_LAZY
- + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
- }
+ LazyOutputFormat.setOutputFormatClass(job,PigOutputFormatTez.class);
} else {
job.setOutputFormatClass(PigOutputFormatTez.class);
}
}
+
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Sat Jan 7 00:11:37 2017
@@ -18,7 +18,6 @@ package org.apache.pig.backend.hadoop.hb
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
import java.lang.reflect.UndeclaredThrowableException;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -65,6 +64,7 @@ import org.apache.hadoop.hbase.mapreduce
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
@@ -86,7 +86,6 @@ import org.apache.pig.ResourceSchema.Res
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreResources;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
import org.apache.pig.builtin.FuncUtils;
import org.apache.pig.builtin.Utf8StorageConverter;
@@ -787,46 +786,35 @@ public class HBaseStorage extends LoadFu
public List<String> getShipFiles() {
// Depend on HBase to do the right thing when available, as of HBASE-9165
try {
- Method addHBaseDependencyJars =
- TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class);
- if (addHBaseDependencyJars != null) {
- Configuration conf = new Configuration();
- addHBaseDependencyJars.invoke(null, conf);
- if (conf.get("tmpjars") != null) {
- String[] tmpjars = conf.getStrings("tmpjars");
- List<String> shipFiles = new ArrayList<String>(tmpjars.length);
- for (String tmpjar : tmpjars) {
- shipFiles.add(new URL(tmpjar).getPath());
- }
- return shipFiles;
+ Configuration conf = new Configuration();
+ TableMapReduceUtil.addHBaseDependencyJars(conf);
+ if (conf.get("tmpjars") != null) {
+ String[] tmpjars = conf.getStrings("tmpjars");
+ List<String> shipFiles = new ArrayList<String>(tmpjars.length);
+ for (String tmpjar : tmpjars) {
+ shipFiles.add(new URL(tmpjar).getPath());
}
+ return shipFiles;
+ }
+ } catch (IOException e) {
+ if(e instanceof MalformedURLException){
+ LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
+ + " had malformed url. Falling back to previous logic.", e);
+ }else {
+ LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+ + " failed. Falling back to previous logic.", e);
}
- } catch (NoSuchMethodException e) {
- LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available."
- + " Falling back to previous logic.", e);
- } catch (IllegalAccessException e) {
- LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
- + " not permitted. Falling back to previous logic.", e);
- } catch (InvocationTargetException e) {
- LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
- + " failed. Falling back to previous logic.", e);
- } catch (MalformedURLException e) {
- LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
- + " had malformed url. Falling back to previous logic.", e);
}
List<Class> classList = new ArrayList<Class>();
classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client
classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server
- if (!HadoopShims.isHadoopYARN()) { //Avoid shipping duplicate. Hadoop 0.23/2 itself has guava
- classList.add(com.google.common.collect.Lists.class); // guava
- }
classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper
// Additional jars that are specific to v0.95.0+
addClassToList("org.cloudera.htrace.Trace", classList); // htrace
addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol
addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common
- addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar
+ addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compat
addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty
return FuncUtils.getShipFiles(classList);
}
@@ -877,27 +865,13 @@ public class HBaseStorage extends LoadFu
}
if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) {
- // Will not be entering this block for 0.20.2 as it has no security.
try {
- // getCurrentUser method is not public in 0.20.2
- Method m1 = UserGroupInformation.class.getMethod("getCurrentUser");
- UserGroupInformation currentUser = (UserGroupInformation) m1.invoke(null,(Object[]) null);
- // hasKerberosCredentials method not available in 0.20.2
- Method m2 = UserGroupInformation.class.getMethod("hasKerberosCredentials");
- boolean hasKerberosCredentials = (Boolean) m2.invoke(currentUser, (Object[]) null);
- if (hasKerberosCredentials) {
- // Class and method are available only from 0.92 security release
- Class tokenUtilClass = Class
- .forName("org.apache.hadoop.hbase.security.token.TokenUtil");
- Method m3 = tokenUtilClass.getMethod("obtainTokenForJob", new Class[] {
- Configuration.class, UserGroupInformation.class, Job.class });
- m3.invoke(null, new Object[] { hbaseConf, currentUser, job });
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ if (currentUser.hasKerberosCredentials()) {
+ TokenUtil.obtainTokenForJob(hbaseConf,currentUser,job);
} else {
LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available");
}
- } catch (ClassNotFoundException cnfe) {
- throw new RuntimeException("Failure loading TokenUtil class, "
- + "is secure RPC available?", cnfe);
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
Modified: pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java Sat Jan 7 00:11:37 2017
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.shims.Hadoop23Shims;
import org.apache.hadoop.hive.shims.HadoopShimsSecure;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.Counters;
@@ -180,20 +181,9 @@ abstract class HiveUDFBase extends EvalF
@Override
public List<String> getShipFiles() {
- String hadoopVersion = "20S";
- if (Utils.isHadoop23() || Utils.isHadoop2()) {
- hadoopVersion = "23";
- }
- Class hadoopVersionShimsClass;
- try {
- hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
- hadoopVersion + "Shims");
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
- }
List<String> files = FuncUtils.getShipFiles(new Class[] {GenericUDF.class,
- PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
- hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class});
+ PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
+ Hadoop23Shims.class, HadoopShimsSecure.class, Collector.class});
return files;
}
Modified: pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/OrcStorage.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/OrcStorage.java Sat Jan 7 00:11:37 2017
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.Hadoop23Shims;
import org.apache.hadoop.hive.shims.HadoopShimsSecure;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
@@ -389,20 +390,8 @@ public class OrcStorage extends LoadFunc
@Override
public List<String> getShipFiles() {
- List<String> cacheFiles = new ArrayList<String>();
- String hadoopVersion = "20S";
- if (Utils.isHadoop23() || Utils.isHadoop2()) {
- hadoopVersion = "23";
- }
- Class hadoopVersionShimsClass;
- try {
- hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
- hadoopVersion + "Shims");
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
- }
Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class,
- org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass,
+ org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, Hadoop23Shims.class,
Input.class};
return FuncUtils.getShipFiles(classList);
}
Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Sat Jan 7 00:11:37 2017
@@ -68,7 +68,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
@@ -412,7 +411,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
@Override
public InputFormat getInputFormat() {
if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
- && (!bzipinput_usehadoops || !HadoopShims.isHadoopYARN()) ) {
+ && (!bzipinput_usehadoops) ) {
mLog.info("Using Bzip2TextInputFormat");
return new Bzip2TextInputFormat();
} else {
Modified: pig/trunk/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/TextLoader.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/TextLoader.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/TextLoader.java Sat Jan 7 00:11:37 2017
@@ -37,7 +37,6 @@ import org.apache.pig.ResourceSchema.Res
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
@@ -259,8 +258,7 @@ public class TextLoader extends LoadFunc
@Override
public InputFormat getInputFormat() {
if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
- && !HadoopShims.isHadoopYARN()
- && !bzipinput_usehadoops ) {
+ && !bzipinput_usehadoops ) {
mLog.info("Using Bzip2TextInputFormat");
return new Bzip2TextInputFormat();
} else {
Modified: pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Sat Jan 7 00:11:37 2017
@@ -90,7 +90,9 @@ public class PoissonSampleLoader extends
// number of tuples to be skipped
Tuple t = loader.getNext();
if(t == null) {
- return createNumRowTuple(null);
+ // since skipInterval is -1, no previous sample,
+ // and next sample is null -> the data set is empty
+ return null;
}
long availRedMem = (long) ( totalMemory * heapPerc);
// availRedMem = 155084396;
Modified: pig/trunk/src/org/apache/pig/impl/io/PigFile.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/PigFile.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/PigFile.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/PigFile.java Sat Jan 7 00:11:37 2017
@@ -102,7 +102,7 @@ public class PigFile {
if(oc.needsTaskCommit(tac)) {
oc.commitTask(tac);
}
- HadoopShims.commitOrCleanup(oc, jc);
+ oc.commitJob(jc);
}
@Override
Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Sat Jan 7 00:11:37 2017
@@ -47,7 +47,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.impl.PigContext;
import org.apache.tools.bzip2r.BZip2Constants;
import org.joda.time.DateTime;
@@ -66,7 +65,6 @@ public class JarManager {
BZIP2R(BZip2Constants.class),
AUTOMATON(Automaton.class),
ANTLR(CommonTokenStream.class),
- GUAVA(Multimaps.class),
JODATIME(DateTime.class);
private final Class pkgClass;
@@ -208,9 +206,6 @@ public class JarManager {
public static List<String> getDefaultJars() {
List<String> defaultJars = new ArrayList<String>();
for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
- if(pkgToSend.equals(DefaultPigPackages.GUAVA) && HadoopShims.isHadoopYARN()) {
- continue; //Skip
- }
String jar = findContainingJar(pkgToSend.getPkgClass());
if (!defaultJars.contains(jar)) {
defaultJars.add(jar);
Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Sat Jan 7 00:11:37 2017
@@ -94,20 +94,6 @@ public class Utils {
return System.getProperty("java.vendor").contains("IBM");
}
- public static boolean isHadoop23() {
- String version = org.apache.hadoop.util.VersionInfo.getVersion();
- if (version.matches("\\b0\\.23\\..+\\b"))
- return true;
- return false;
- }
-
- public static boolean isHadoop2() {
- String version = org.apache.hadoop.util.VersionInfo.getVersion();
- if (version.matches("\\b2\\.\\d+\\..+"))
- return true;
- return false;
- }
-
public static boolean is64bitJVM() {
String arch = System.getProperties().getProperty("sun.arch.data.model",
System.getProperty("com.ibm.vm.bitmode"));
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Sat Jan 7 00:11:37 2017
@@ -24,7 +24,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
/**
@@ -71,7 +71,7 @@ public class PigStatsUtil {
*/
@Deprecated
public static final String FS_COUNTER_GROUP
- = HadoopShims.getFsCounterGroupName();
+ = MRPigStatsUtil.FS_COUNTER_GROUP;
/**
* Returns an empty PigStats object Use of this method is not advised as it
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Sat Jan 7 00:11:37 2017
@@ -32,15 +32,16 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigCounters;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.impl.io.FileSpec;
@@ -53,6 +54,8 @@ import org.apache.pig.tools.pigstats.Out
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
+import org.python.google.common.collect.Lists;
+
/**
* This class encapsulates the runtime statistics of a MapReduce job.
@@ -281,7 +284,7 @@ public final class MRJobStats extends Jo
void addCounters(Job job) {
try {
- counters = HadoopShims.getCounters(job);
+ counters = getCounters(job);
} catch (IOException e) {
LOG.warn("Unable to get job counters", e);
}
@@ -349,13 +352,13 @@ public final class MRJobStats extends Jo
void addMapReduceStatistics(Job job) {
Iterator<TaskReport> maps = null;
try {
- maps = HadoopShims.getTaskReports(job, TaskType.MAP);
+ maps = getTaskReports(job, TaskType.MAP);
} catch (IOException e) {
LOG.warn("Failed to get map task report", e);
}
Iterator<TaskReport> reduces = null;
try {
- reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+ reduces = getTaskReports(job, TaskType.REDUCE);
} catch (IOException e) {
LOG.warn("Failed to get reduce task report", e);
}
@@ -515,4 +518,35 @@ public final class MRJobStats extends Jo
inputs.add(is);
}
+ public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
+ if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
+ LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
+ return null;
+ }
+ Cluster cluster = new Cluster(job.getJobConf());
+ try {
+ org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
+ if (mrJob == null) { // In local mode, mrJob will be null
+ mrJob = job.getJob();
+ }
+ org.apache.hadoop.mapreduce.TaskReport[] reports = mrJob.getTaskReports(type);
+ return Lists.newArrayList(reports).iterator();
+ } catch (InterruptedException ir) {
+ throw new IOException(ir);
+ }
+ }
+
+ public static Counters getCounters(Job job) throws IOException {
+ try {
+ Cluster cluster = new Cluster(job.getJobConf());
+ org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
+ if (mrJob == null) { // In local mode, mrJob will be null
+ mrJob = job.getJob();
+ }
+ return new Counters(mrJob.getCounters());
+ } catch (Exception ir) {
+ throw new IOException(ir);
+ }
+ }
+
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Sat Jan 7 00:11:37 2017
@@ -33,7 +33,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.classification.InterfaceAudience.Private;
import org.apache.pig.impl.PigContext;
import org.apache.pig.tools.pigstats.JobStats;
@@ -51,7 +50,7 @@ public class MRPigStatsUtil extends PigS
public static final String TASK_COUNTER_GROUP
= "org.apache.hadoop.mapred.Task$Counter";
public static final String FS_COUNTER_GROUP
- = HadoopShims.getFsCounterGroupName();
+ = "org.apache.hadoop.mapreduce.FileSystemCounter";
private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class);
Modified: pig/trunk/test/e2e/pig/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/build.xml?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/build.xml (original)
+++ pig/trunk/test/e2e/pig/build.xml Sat Jan 7 00:11:37 2017
@@ -27,9 +27,8 @@
<property name="hive.lib.dir"
value="${pig.base.dir}/build/ivy/lib/Pig"/>
- <condition property="hive.hadoop.shims.version" value="0.23" else="0.20S">
- <equals arg1="${hadoopversion}" arg2="23" />
- </condition>
+ <property name="hadoopversion" value="2" />
+ <property name="hive.hadoop.shims.version" value="0.23" />
<property name="mvnrepo" value="http://repo2.maven.org/maven2"/>
Modified: pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java (original)
+++ pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java Sat Jan 7 00:11:37 2017
@@ -360,7 +360,7 @@ public class TestLoadStoreFuncLifeCycle
// result, the number of StoreFunc instances is greater by 1 in
// Hadoop-2.0.x.
assertTrue("storer instanciation count increasing: " + Storer.count,
- Storer.count <= (org.apache.pig.impl.util.Utils.isHadoop2() ? 5 : 4));
+ Storer.count <= 5);
}
}
Modified: pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java Sat Jan 7 00:11:37 2017
@@ -82,41 +82,38 @@ public class TestQueryParserUtils {
QueryParserUtils.setHdfsServers("hello://nn1/tmp", pc);
assertEquals(null, props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- if(org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) {
- // webhdfs
- props.remove(MRConfiguration.JOB_HDFS_SERVERS);
- QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc);
- assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc);
- assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-
- // har with webhfs
- QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc);
- assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc);
- assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- props.remove(MRConfiguration.JOB_HDFS_SERVERS);
- QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc);
- assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-
- //viewfs
- props.remove(MRConfiguration.JOB_HDFS_SERVERS);
- QueryParserUtils.setHdfsServers("viewfs:/tmp", pc);
- assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- QueryParserUtils.setHdfsServers("viewfs:///tmp", pc);
- assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc);
- assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-
- //har with viewfs
- props.remove(MRConfiguration.JOB_HDFS_SERVERS);
- QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc);
- assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
- QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc);
- assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ // webhdfs
+ props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+ QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc);
+ assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc);
+ assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+
+ // har with webhfs
+ QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc);
+ assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc);
+ assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+ QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc);
+ assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+
+ //viewfs
+ props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+ QueryParserUtils.setHdfsServers("viewfs:/tmp", pc);
+ assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ QueryParserUtils.setHdfsServers("viewfs:///tmp", pc);
+ assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc);
+ assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+
+ //har with viewfs
+ props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+ QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc);
+ assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+ QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc);
+ assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-
- }
}
Modified: pig/trunk/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBZip.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBZip.java Sat Jan 7 00:11:37 2017
@@ -43,7 +43,6 @@ import org.apache.hadoop.mapreduce.Input
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
@@ -67,16 +66,10 @@ public class TestBZip {
@Parameters(name = "pig.bzip.use.hadoop.inputformat = {0}.")
public static Iterable<Object[]> data() {
- if ( HadoopShims.isHadoopYARN() ) {
- return Arrays.asList(new Object[][] {
- { false },
- { true }
- });
- } else {
- return Arrays.asList(new Object[][] {
- { false }
- });
- }
+ return Arrays.asList(new Object[][] {
+ { false },
+ { true }
+ });
}
public TestBZip (Boolean useBzipFromHadoop) {
Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Sat Jan 7 00:11:37 2017
@@ -63,7 +63,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
@@ -131,7 +130,7 @@ public class TestJobControlCompiler {
// verifying the jar gets on distributed cache
Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
// guava jar is not shipped with Hadoop 2.x
- Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), HadoopShims.isHadoopYARN() ? 5 : 6, fileClassPaths.length);
+ Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 5, fileClassPaths.length);
Path distributedCachePath = fileClassPaths[0];
Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
// hadoop bug requires path to not contain hdfs://hotname in front
@@ -235,22 +234,12 @@ public class TestJobControlCompiler {
// 4. another.jar and 5. udf1.jar, and not duplicate udf.jar
System.out.println("cache.files= " + Arrays.toString(cacheURIs));
System.out.println("classpath.files= " + Arrays.toString(fileClassPaths));
- if (HadoopShims.isHadoopYARN()) {
- // Default jars - 5 (pig, antlr, joda-time, automaton)
- // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
- Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
- Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
- Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9,
- Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
- } else {
- // Default jars - 5. Has guava in addition
- // There will be same entries duplicated for udf.jar and udf2.jar
- Assert.assertEquals("size 12 for " + Arrays.toString(cacheURIs), 12,
- Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
- Assert.assertEquals("size 12 for " + Arrays.toString(fileClassPaths), 12,
- Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
- }
-
+ // Default jars - 5 (pig, antlr, joda-time, automaton)
+ // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
+ Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
+ Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
+ Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9,
+ Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
// Count occurrences of the resources
Map<String, Integer> occurrences = new HashMap<String, Integer>();
@@ -259,22 +248,12 @@ public class TestJobControlCompiler {
val = (val == null) ? 1 : ++val;
occurrences.put(cacheURI.toString(), val);
}
- if (HadoopShims.isHadoopYARN()) {
- Assert.assertEquals(9, occurrences.size());
- } else {
- Assert.assertEquals(10, occurrences.size()); //guava jar in addition
- }
+ Assert.assertEquals(9, occurrences.size());
for (String file : occurrences.keySet()) {
- if (!HadoopShims.isHadoopYARN() && (file.endsWith("udf.jar") || file.endsWith("udf2.jar"))) {
- // Same path added twice which is ok. It should not be a shipped to hdfs temp path.
- // We assert path is same by checking count
- Assert.assertEquals("Two occurrences for " + file, 2, (int) occurrences.get(file));
- } else {
- // check that only single occurrence even though we added once to dist cache (simulating via Oozie)
- // and second time through pig register jar when there is symlink
- Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file));
- }
+ // check that only single occurrence even though we added once to dist cache (simulating via Oozie)
+ // and second time through pig register jar when there is symlink
+ Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file));
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java Sat Jan 7 00:11:37 2017
@@ -45,12 +45,8 @@ public abstract class TestLoaderStorerSh
"store a into 'ooo';";
PhysicalPlan pp = Util.buildPp(pigServer, query);
- String hadoopVersion = "20S";
- if (Utils.isHadoop23() || Utils.isHadoop2()) {
- hadoopVersion = "23";
- }
- String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
- "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"};
+ String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+ "hive-shims-0.23", "hive-shims-common", "kryo"};
checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
}
@@ -61,12 +57,8 @@ public abstract class TestLoaderStorerSh
"store a into 'ooo' using OrcStorage;";
PhysicalPlan pp = Util.buildPp(pigServer, query);
- String hadoopVersion = "20S";
- if (Utils.isHadoop23() || Utils.isHadoop2()) {
- hadoopVersion = "23";
- }
- String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
- "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"};
+ String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+ "hive-shims-0.23", "hive-shims-common", "kryo"};
checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
}
Modified: pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java Sat Jan 7 00:11:37 2017
@@ -1558,14 +1558,7 @@ public class TestMultiQueryCompiler {
MROperPlan mrp = null;
try {
- java.lang.reflect.Method compile = launcher.getClass()
- .getDeclaredMethod("compile",
- new Class[] { PhysicalPlan.class, PigContext.class });
-
- compile.setAccessible(true);
-
- mrp = (MROperPlan) compile.invoke(launcher, new Object[] { pp, myPig.getPigContext() });
-
+ mrp = launcher.compile(pp, myPig.getPigContext());
Assert.assertNotNull(mrp);
} catch (Exception e) {
Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Sat Jan 7 00:11:37 2017
@@ -62,6 +62,7 @@ import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -763,10 +764,9 @@ public class TestPigRunner {
}
@Test
+ @Ignore
+ // Skip in hadoop 23 test, see PIG-2449
public void classLoaderTest() throws Exception {
- // Skip in hadoop 23 test, see PIG-2449
- if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2())
- return;
PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
w.println("register test/org/apache/pig/test/data/pigtestloader.jar");
w.println("A = load '" + INPUT_FILE + "' using org.apache.pig.test.PigTestLoader();");
Modified: pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java Sat Jan 7 00:11:37 2017
@@ -103,11 +103,7 @@ public class TestPigStatsMR extends Test
private static MROperPlan getMRPlan(PhysicalPlan pp, PigContext ctx) throws Exception {
MapReduceLauncher launcher = new MapReduceLauncher();
- java.lang.reflect.Method compile = launcher.getClass()
- .getDeclaredMethod("compile",
- new Class[] { PhysicalPlan.class, PigContext.class });
- compile.setAccessible(true);
- return (MROperPlan) compile.invoke(launcher, new Object[] { pp, ctx });
+ return launcher.compile(pp,ctx);
}
private static String getAlias(MapReduceOper mro) throws Exception {
Modified: pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Sat Jan 7 00:11:37 2017
@@ -65,6 +65,7 @@ public class TestSkewedJoin {
private static final String INPUT_FILE5 = "SkewedJoinInput5.txt";
private static final String INPUT_FILE6 = "SkewedJoinInput6.txt";
private static final String INPUT_FILE7 = "SkewedJoinInput7.txt";
+ private static final String INPUT_FILE8 = "SkewedJoinInput8.txt";
private static final String TEST_DIR = Util.getTestDirectory(TestSkewedJoin.class);
private static final String INPUT_DIR = TEST_DIR + Path.SEPARATOR + "input";
private static final String OUTPUT_DIR = TEST_DIR + Path.SEPARATOR + "output";
@@ -173,6 +174,11 @@ public class TestSkewedJoin {
}
w7.close();
+ //Empty file
+ PrintWriter w8 = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE8));
+ w8.close();
+
+
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1);
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE2, INPUT_FILE2);
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE3, INPUT_FILE3);
@@ -180,6 +186,7 @@ public class TestSkewedJoin {
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE5, INPUT_FILE5);
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE6, INPUT_FILE6);
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE7, INPUT_FILE7);
+ Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE8, INPUT_FILE8);
}
private static void deleteFiles() throws IOException {
@@ -187,6 +194,21 @@ public class TestSkewedJoin {
}
@Test
+ public void testSkewedJoinMapLeftEmpty() throws IOException{
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE8 + "' as (idM:[]);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
+ pigServer.registerQuery("C = join A by idM#'id', B by id using 'skewed' PARALLEL 2;");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+ int count = 0;
+ while(iter.hasNext()) {
+ count++;
+ iter.next();
+ }
+ assertEquals(0, count);
+ }
+
+
+ @Test
public void testSkewedJoinWithGroup() throws IOException{
pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Sat Jan 7 00:11:37 2017
@@ -75,7 +75,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.builtin.Utf8StorageConverter;
import org.apache.pig.data.BagFactory;
@@ -648,13 +648,10 @@ public class Util {
}
}
- static private String getMkDirCommandForHadoop2_0(String fileName) {
- if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) {
- Path parentDir = new Path(fileName).getParent();
- String mkdirCommand = parentDir.getName().isEmpty() ? "" : "fs -mkdir -p " + parentDir + "\n";
- return mkdirCommand;
- }
- return "";
+ static private String getFSMkDirCommand(String fileName) {
+ Path parentDir = new Path(fileName).getParent();
+ String mkdirCommand = parentDir.getName().isEmpty() ? "" : "fs -mkdir -p " + parentDir + "\n";
+ return mkdirCommand;
}
/**
@@ -676,7 +673,7 @@ public class Util {
fileNameOnCluster = fileNameOnCluster.replace('\\','/');
}
PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
- String script = getMkDirCommandForHadoop2_0(fileNameOnCluster) + "fs -put " + localFileName + " " + fileNameOnCluster;
+ String script = getFSMkDirCommand(fileNameOnCluster) + "fs -put " + localFileName + " " + fileNameOnCluster;
GruntParser parser = new GruntParser(new StringReader(script), ps);
parser.setInteractive(false);
try {
@@ -907,14 +904,7 @@ public class Util {
MapRedUtil.checkLeafIsStore(pp, pc);
MapReduceLauncher launcher = new MapReduceLauncher();
-
- java.lang.reflect.Method compile = launcher.getClass()
- .getDeclaredMethod("compile",
- new Class[] { PhysicalPlan.class, PigContext.class });
-
- compile.setAccessible(true);
-
- return (MROperPlan) compile.invoke(launcher, new Object[] { pp, pc });
+ return launcher.compile(pp,pc);
}
public static MROperPlan buildMRPlan(String query, PigContext pc) throws Exception {
@@ -1357,16 +1347,7 @@ public class Util {
// For tez testing, we want to avoid TezResourceManager/LocalResource reuse
// (when switching between local and mapreduce/tez)
- if( HadoopShims.isHadoopYARN() ) {
- try {
- java.lang.reflect.Method tez_dropInstance = Class.forName(
- "org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager").getDeclaredMethod(
- "dropInstance", (Class<?>[]) null );
- tez_dropInstance.invoke(null);
- } catch (Exception e){
- throw new RuntimeException(e);
- }
- }
+ TezResourceManager.dropInstance();
// TODO: once we have Tez local mode, we can get rid of this. For now,
// if we run this test suite in Tez mode and there are some tests
Modified: pig/trunk/test/perf/pigmix/bin/generate_data.sh
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/bin/generate_data.sh?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/perf/pigmix/bin/generate_data.sh (original)
+++ pig/trunk/test/perf/pigmix/bin/generate_data.sh Sat Jan 7 00:11:37 2017
@@ -25,20 +25,11 @@ fi
source $PIGMIX_HOME/conf/config.sh
-if [ $HADOOP_VERSION == "23" ]; then
- echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot"
- $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot
-else
- echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot"
- $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot
-fi
+echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot"
+$HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot
shopt -s extglob
-if [ $HADOOP_VERSION == "23" ]; then
- pigjar=`echo $PIG_HOME/pig*-h2.jar`
-else
- pigjar=`echo $PIG_HOME/pig*-h1.jar`
-fi
+pigjar=`echo $PIG_HOME/pig*-h2.jar`
pigmixjar=$PIGMIX_HOME/pigmix.jar
Modified: pig/trunk/test/perf/pigmix/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/build.xml?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/perf/pigmix/build.xml (original)
+++ pig/trunk/test/perf/pigmix/build.xml Sat Jan 7 00:11:37 2017
@@ -34,6 +34,8 @@
</fileset>
</path>
+ <property name="hadoopversion" value="2" />
+
<property name="java.dir" value="${basedir}/src/java"/>
<property name="pigmix.build.dir" value="${basedir}/build"/>
<property name="pigmix.jar" value="${basedir}/pigmix.jar"/>