You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/01/07 00:11:38 UTC

svn commit: r1777738 [2/2] - in /pig/trunk: ./ bin/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ ivy/ shims/src/hadoop2/ shims/src/hadoop2/org/ shims/src/hadoop2/org/apache/ shims/src/hadoop2/org/apache...

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Sat Jan  7 00:11:37 2017
@@ -267,7 +267,7 @@ public class PigInputFormat extends Inpu
                                 jobcontext.getJobID()));
                 List<InputSplit> oneInputPigSplits = getPigSplits(
                         oneInputSplits, i, inpTargets.get(i),
-                        HadoopShims.getDefaultBlockSize(fs, isFsPath? path: fs.getWorkingDirectory()),
+                        fs.getDefaultBlockSize(isFsPath? path: fs.getWorkingDirectory()),
                         combinable, confClone);
                 splits.addAll(oneInputPigSplits);
             } catch (ExecException ee) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigOutputCommitter.java Sat Jan  7 00:11:37 2017
@@ -18,7 +18,6 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -156,12 +155,7 @@ public class PigOutputCommitter extends
         for (Pair<OutputCommitter, POStore> mapCommitter : mapOutputCommitters) {
             if (mapCommitter.first!=null) {
                 try {
-                    // Use reflection, Hadoop 1.x line does not have such method
-                    Method m = mapCommitter.first.getClass().getMethod("isRecoverySupported");
-                    allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
-                            && (Boolean)m.invoke(mapCommitter.first);
-                } catch (NoSuchMethodException e) {
-                    allOutputCommitterSupportRecovery = false;
+                    allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && mapCommitter.first.isRecoverySupported();
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
@@ -173,12 +167,7 @@ public class PigOutputCommitter extends
             reduceOutputCommitters) {
             if (reduceCommitter.first!=null) {
                 try {
-                    // Use reflection, Hadoop 1.x line does not have such method
-                    Method m = reduceCommitter.first.getClass().getMethod("isRecoverySupported");
-                    allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery
-                            && (Boolean)m.invoke(reduceCommitter.first);
-                } catch (NoSuchMethodException e) {
-                    allOutputCommitterSupportRecovery = false;
+                    allOutputCommitterSupportRecovery = allOutputCommitterSupportRecovery && reduceCommitter.first.isRecoverySupported();
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
@@ -197,10 +186,7 @@ public class PigOutputCommitter extends
                         mapCommitter.second);
                 try {
                     // Use reflection, Hadoop 1.x line does not have such method
-                    Method m = mapCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
-                    m.invoke(mapCommitter.first, updatedContext);
-                } catch (NoSuchMethodException e) {
-                    // We are using Hadoop 1.x, ignore
+                    mapCommitter.first.recoverTask(updatedContext);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -212,11 +198,7 @@ public class PigOutputCommitter extends
                 TaskAttemptContext updatedContext = setUpContext(context,
                         reduceCommitter.second);
                 try {
-                    // Use reflection, Hadoop 1.x line does not have such method
-                    Method m = reduceCommitter.first.getClass().getMethod("recoverTask", TaskAttemptContext.class);
-                    m.invoke(reduceCommitter.first, updatedContext);
-                } catch (NoSuchMethodException e) {
-                    // We are using Hadoop 1.x, ignore
+                    reduceCommitter.first.recoverTask(updatedContext);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -256,10 +238,7 @@ public class PigOutputCommitter extends
                         mapCommitter.second);
                 // PIG-2642 promote files before calling storeCleanup/storeSchema 
                 try {
-                    // Use reflection, 20.2 does not have such method
-                    Method m = mapCommitter.first.getClass().getMethod("commitJob", JobContext.class);
-                    m.setAccessible(true);
-                    m.invoke(mapCommitter.first, updatedContext);
+                    mapCommitter.first.commitJob(updatedContext);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -273,10 +252,7 @@ public class PigOutputCommitter extends
                         reduceCommitter.second);
                 // PIG-2642 promote files before calling storeCleanup/storeSchema 
                 try {
-                    // Use reflection, 20.2 does not have such method
-                    Method m = reduceCommitter.first.getClass().getMethod("commitJob", JobContext.class);
-                    m.setAccessible(true);
-                    m.invoke(reduceCommitter.first, updatedContext);
+                    reduceCommitter.first.commitJob(updatedContext);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -293,10 +269,7 @@ public class PigOutputCommitter extends
                 JobContext updatedContext = setUpContext(context,
                         mapCommitter.second);
                 try {
-                    // Use reflection, 20.2 does not have such method
-                    Method m = mapCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
-                    m.setAccessible(true);
-                    m.invoke(mapCommitter.first, updatedContext, state);
+                    mapCommitter.first.abortJob(updatedContext, state);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -309,10 +282,7 @@ public class PigOutputCommitter extends
                 JobContext updatedContext = setUpContext(context,
                         reduceCommitter.second);
                 try {
-                    // Use reflection, 20.2 does not have such method
-                    Method m = reduceCommitter.first.getClass().getMethod("abortJob", JobContext.class, State.class);
-                    m.setAccessible(true);
-                    m.invoke(reduceCommitter.first, updatedContext, state);
+                    reduceCommitter.first.abortJob(updatedContext, state);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Sat Jan  7 00:11:37 2017
@@ -19,7 +19,6 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.IOException;
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -43,6 +42,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -88,7 +88,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
@@ -285,7 +284,7 @@ public class TezDagBuilder extends TezOp
 
         try {
             fs = FileSystem.get(globalConf);
-            intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs, FileLocalizer.getTemporaryResourcePath(pc));
+            intermediateTaskInputSize = fs.getDefaultBlockSize(FileLocalizer.getTemporaryResourcePath(pc));
         } catch (Exception e) {
             log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e);
             intermediateTaskInputSize = 134217728L;
@@ -1428,22 +1427,12 @@ public class TezDagBuilder extends TezOp
 
     private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
         // the OutputFormat we report to Hadoop is always PigOutputFormat which
-        // can be wrapped with LazyOutputFormat provided if it is supported by
-        // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
+        // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set
         if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
-            try {
-                Class<?> clazz = PigContext
-                        .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
-                Method method = clazz.getMethod("setOutputFormatClass",
-                        org.apache.hadoop.mapreduce.Job.class, Class.class);
-                method.invoke(null, job, PigOutputFormatTez.class);
-            } catch (Exception e) {
-                job.setOutputFormatClass(PigOutputFormatTez.class);
-                log.warn(PigConfiguration.PIG_OUTPUT_LAZY
-                        + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
-            }
+            LazyOutputFormat.setOutputFormatClass(job,PigOutputFormatTez.class);
         } else {
             job.setOutputFormatClass(PigOutputFormatTez.class);
         }
     }
+
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Sat Jan  7 00:11:37 2017
@@ -18,7 +18,6 @@ package org.apache.pig.backend.hadoop.hb
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -65,6 +64,7 @@ import org.apache.hadoop.hbase.mapreduce
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
@@ -86,7 +86,6 @@ import org.apache.pig.ResourceSchema.Res
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.StoreResources;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
 import org.apache.pig.builtin.FuncUtils;
 import org.apache.pig.builtin.Utf8StorageConverter;
@@ -787,46 +786,35 @@ public class HBaseStorage extends LoadFu
     public List<String> getShipFiles() {
         // Depend on HBase to do the right thing when available, as of HBASE-9165
         try {
-            Method addHBaseDependencyJars =
-              TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class);
-            if (addHBaseDependencyJars != null) {
-                Configuration conf = new Configuration();
-                addHBaseDependencyJars.invoke(null, conf);
-                if (conf.get("tmpjars") != null) {
-                    String[] tmpjars = conf.getStrings("tmpjars");
-                    List<String> shipFiles = new ArrayList<String>(tmpjars.length);
-                    for (String tmpjar : tmpjars) {
-                        shipFiles.add(new URL(tmpjar).getPath());
-                    }
-                    return shipFiles;
+            Configuration conf = new Configuration();
+            TableMapReduceUtil.addHBaseDependencyJars(conf);
+            if (conf.get("tmpjars") != null) {
+                String[] tmpjars = conf.getStrings("tmpjars");
+                List<String> shipFiles = new ArrayList<String>(tmpjars.length);
+                for (String tmpjar : tmpjars) {
+                    shipFiles.add(new URL(tmpjar).getPath());
                 }
+                return shipFiles;
+            }
+        } catch (IOException e) {
+            if(e instanceof MalformedURLException){
+                LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
+                        + " had malformed url. Falling back to previous logic.", e);
+            }else {
+                LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+                        + " failed. Falling back to previous logic.", e);
             }
-        } catch (NoSuchMethodException e) {
-            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available."
-              + " Falling back to previous logic.", e);
-        } catch (IllegalAccessException e) {
-            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
-              + " not permitted. Falling back to previous logic.", e);
-        } catch (InvocationTargetException e) {
-            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
-              + " failed. Falling back to previous logic.", e);
-        } catch (MalformedURLException e) {
-            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
-                    + " had malformed url. Falling back to previous logic.", e);
         }
 
         List<Class> classList = new ArrayList<Class>();
         classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client
         classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server
-        if (!HadoopShims.isHadoopYARN()) { //Avoid shipping duplicate. Hadoop 0.23/2 itself has guava
-            classList.add(com.google.common.collect.Lists.class); // guava
-        }
         classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper
         // Additional jars that are specific to v0.95.0+
         addClassToList("org.cloudera.htrace.Trace", classList); // htrace
         addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol
         addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common
-        addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar
+        addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compat
         addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty
         return FuncUtils.getShipFiles(classList);
     }
@@ -877,27 +865,13 @@ public class HBaseStorage extends LoadFu
         }
 
         if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) {
-            // Will not be entering this block for 0.20.2 as it has no security.
             try {
-                // getCurrentUser method is not public in 0.20.2
-                Method m1 = UserGroupInformation.class.getMethod("getCurrentUser");
-                UserGroupInformation currentUser = (UserGroupInformation) m1.invoke(null,(Object[]) null);
-                // hasKerberosCredentials method not available in 0.20.2
-                Method m2 = UserGroupInformation.class.getMethod("hasKerberosCredentials");
-                boolean hasKerberosCredentials = (Boolean) m2.invoke(currentUser, (Object[]) null);
-                if (hasKerberosCredentials) {
-                    // Class and method are available only from 0.92 security release
-                    Class tokenUtilClass = Class
-                            .forName("org.apache.hadoop.hbase.security.token.TokenUtil");
-                    Method m3 = tokenUtilClass.getMethod("obtainTokenForJob", new Class[] {
-                            Configuration.class, UserGroupInformation.class, Job.class });
-                    m3.invoke(null, new Object[] { hbaseConf, currentUser, job });
+                UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+                if (currentUser.hasKerberosCredentials()) {
+                    TokenUtil.obtainTokenForJob(hbaseConf,currentUser,job);
                 } else {
                     LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available");
                 }
-            } catch (ClassNotFoundException cnfe) {
-                throw new RuntimeException("Failure loading TokenUtil class, "
-                        + "is secure RPC available?", cnfe);
             } catch (RuntimeException re) {
                 throw re;
             } catch (Exception e) {

Modified: pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/HiveUDFBase.java Sat Jan  7 00:11:37 2017
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.shims.Hadoop23Shims;
 import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.Counters;
@@ -180,20 +181,9 @@ abstract class HiveUDFBase extends EvalF
 
     @Override
     public List<String> getShipFiles() {
-        String hadoopVersion = "20S";
-        if (Utils.isHadoop23() || Utils.isHadoop2()) {
-            hadoopVersion = "23";
-        }
-        Class hadoopVersionShimsClass;
-        try {
-            hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
-                    hadoopVersion + "Shims");
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
-        }
         List<String> files = FuncUtils.getShipFiles(new Class[] {GenericUDF.class,
-                PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class, 
-                hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class});
+                PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
+                Hadoop23Shims.class, HadoopShimsSecure.class, Collector.class});
         return files;
     }
 

Modified: pig/trunk/src/org/apache/pig/builtin/OrcStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/OrcStorage.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/OrcStorage.java Sat Jan  7 00:11:37 2017
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.Hadoop23Shims;
 import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -389,20 +390,8 @@ public class OrcStorage extends LoadFunc
 
     @Override
     public List<String> getShipFiles() {
-        List<String> cacheFiles = new ArrayList<String>();
-        String hadoopVersion = "20S";
-        if (Utils.isHadoop23() || Utils.isHadoop2()) {
-            hadoopVersion = "23";
-        }
-        Class hadoopVersionShimsClass;
-        try {
-            hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
-                    hadoopVersion + "Shims");
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
-        }
         Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class,
-                org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass,
+                org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, Hadoop23Shims.class,
                 Input.class};
         return FuncUtils.getShipFiles(classList);
     }

Modified: pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/PigStorage.java Sat Jan  7 00:11:37 2017
@@ -68,7 +68,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
@@ -412,7 +411,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
     @Override
     public InputFormat getInputFormat() {
         if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
-           && (!bzipinput_usehadoops || !HadoopShims.isHadoopYARN()) ) {
+           && (!bzipinput_usehadoops) ) {
             mLog.info("Using Bzip2TextInputFormat");
             return new Bzip2TextInputFormat();
         } else {

Modified: pig/trunk/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/builtin/TextLoader.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/builtin/TextLoader.java (original)
+++ pig/trunk/src/org/apache/pig/builtin/TextLoader.java Sat Jan  7 00:11:37 2017
@@ -37,7 +37,6 @@ import org.apache.pig.ResourceSchema.Res
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
@@ -259,8 +258,7 @@ public class TextLoader extends LoadFunc
     @Override
     public InputFormat getInputFormat() {
         if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
-           && !HadoopShims.isHadoopYARN()
-           && !bzipinput_usehadoops ) {
+                && !bzipinput_usehadoops ) {
             mLog.info("Using Bzip2TextInputFormat");
             return new Bzip2TextInputFormat();
         } else {

Modified: pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Sat Jan  7 00:11:37 2017
@@ -90,7 +90,9 @@ public class PoissonSampleLoader extends
             // number of tuples to be skipped
             Tuple t = loader.getNext();
             if(t == null) {
-                return createNumRowTuple(null);
+                // since skipInterval is -1, no previous sample,
+                // and next sample is null -> the data set is empty
+                return null;
             }
             long availRedMem = (long) ( totalMemory * heapPerc);
             // availRedMem = 155084396;

Modified: pig/trunk/src/org/apache/pig/impl/io/PigFile.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/PigFile.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/PigFile.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/PigFile.java Sat Jan  7 00:11:37 2017
@@ -102,7 +102,7 @@ public class PigFile {
         if(oc.needsTaskCommit(tac)) {
             oc.commitTask(tac);
         }
-        HadoopShims.commitOrCleanup(oc, jc);
+        oc.commitJob(jc);
     }
 
     @Override

Modified: pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/JarManager.java Sat Jan  7 00:11:37 2017
@@ -47,7 +47,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
 import org.apache.tools.bzip2r.BZip2Constants;
 import org.joda.time.DateTime;
@@ -66,7 +65,6 @@ public class JarManager {
         BZIP2R(BZip2Constants.class),
         AUTOMATON(Automaton.class),
         ANTLR(CommonTokenStream.class),
-        GUAVA(Multimaps.class),
         JODATIME(DateTime.class);
 
         private final Class pkgClass;
@@ -208,9 +206,6 @@ public class JarManager {
     public static List<String> getDefaultJars() {
         List<String> defaultJars = new ArrayList<String>();
         for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
-            if(pkgToSend.equals(DefaultPigPackages.GUAVA) && HadoopShims.isHadoopYARN()) {
-                continue; //Skip
-            }
             String jar = findContainingJar(pkgToSend.getPkgClass());
             if (!defaultJars.contains(jar)) {
                 defaultJars.add(jar);

Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Sat Jan  7 00:11:37 2017
@@ -94,20 +94,6 @@ public class Utils {
     	  return System.getProperty("java.vendor").contains("IBM");
     }
 
-    public static boolean isHadoop23() {
-        String version = org.apache.hadoop.util.VersionInfo.getVersion();
-        if (version.matches("\\b0\\.23\\..+\\b"))
-            return true;
-        return false;
-    }
-
-    public static boolean isHadoop2() {
-        String version = org.apache.hadoop.util.VersionInfo.getVersion();
-        if (version.matches("\\b2\\.\\d+\\..+"))
-            return true;
-        return false;
-    }
-
     public static boolean is64bitJVM() {
         String arch = System.getProperties().getProperty("sun.arch.data.model",
                 System.getProperty("com.ibm.vm.bitmode"));

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Sat Jan  7 00:11:37 2017
@@ -24,7 +24,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
 
 /**
@@ -71,7 +71,7 @@ public class PigStatsUtil {
      */
     @Deprecated
     public static final String FS_COUNTER_GROUP
-            = HadoopShims.getFsCounterGroupName();
+            = MRPigStatsUtil.FS_COUNTER_GROUP;
 
     /**
      * Returns an empty PigStats object Use of this method is not advised as it

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Sat Jan  7 00:11:37 2017
@@ -32,15 +32,16 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigCounters;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.impl.io.FileSpec;
@@ -53,6 +54,8 @@ import org.apache.pig.tools.pigstats.Out
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
 
+import org.python.google.common.collect.Lists;
+
 
 /**
  * This class encapsulates the runtime statistics of a MapReduce job.
@@ -281,7 +284,7 @@ public final class MRJobStats extends Jo
 
     void addCounters(Job job) {
         try {
-            counters = HadoopShims.getCounters(job);
+            counters = getCounters(job);
         } catch (IOException e) {
             LOG.warn("Unable to get job counters", e);
         }
@@ -349,13 +352,13 @@ public final class MRJobStats extends Jo
     void addMapReduceStatistics(Job job) {
         Iterator<TaskReport> maps = null;
         try {
-            maps = HadoopShims.getTaskReports(job, TaskType.MAP);
+            maps = getTaskReports(job, TaskType.MAP);
         } catch (IOException e) {
             LOG.warn("Failed to get map task report", e);
         }
         Iterator<TaskReport> reduces = null;
         try {
-            reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+            reduces = getTaskReports(job, TaskType.REDUCE);
         } catch (IOException e) {
             LOG.warn("Failed to get reduce task report", e);
         }
@@ -515,4 +518,35 @@ public final class MRJobStats extends Jo
         inputs.add(is);
     }
 
+    public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
+        if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
+            LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
+            return null;
+        }
+        Cluster cluster = new Cluster(job.getJobConf());
+        try {
+            org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
+            if (mrJob == null) { // In local mode, mrJob will be null
+                mrJob = job.getJob();
+            }
+            org.apache.hadoop.mapreduce.TaskReport[] reports = mrJob.getTaskReports(type);
+            return Lists.newArrayList(reports).iterator();
+        } catch (InterruptedException ir) {
+            throw new IOException(ir);
+        }
+    }
+
+    public static Counters getCounters(Job job) throws IOException {
+        try {
+            Cluster cluster = new Cluster(job.getJobConf());
+            org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
+            if (mrJob == null) { // In local mode, mrJob will be null
+                mrJob = job.getJob();
+            }
+            return new Counters(mrJob.getCounters());
+        } catch (Exception ir) {
+            throw new IOException(ir);
+        }
+    }
+
 }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Sat Jan  7 00:11:37 2017
@@ -33,7 +33,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.classification.InterfaceAudience.Private;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.pigstats.JobStats;
@@ -51,7 +50,7 @@ public class MRPigStatsUtil extends PigS
     public static final String TASK_COUNTER_GROUP
             = "org.apache.hadoop.mapred.Task$Counter";
     public static final String FS_COUNTER_GROUP
-            = HadoopShims.getFsCounterGroupName();
+            = "org.apache.hadoop.mapreduce.FileSystemCounter";
 
     private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class);
 

Modified: pig/trunk/test/e2e/pig/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/build.xml?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/build.xml (original)
+++ pig/trunk/test/e2e/pig/build.xml Sat Jan  7 00:11:37 2017
@@ -27,9 +27,8 @@
   <property name="hive.lib.dir"
 	value="${pig.base.dir}/build/ivy/lib/Pig"/>
 
-  <condition property="hive.hadoop.shims.version" value="0.23" else="0.20S">
-    <equals arg1="${hadoopversion}" arg2="23" />
-  </condition>
+  <property name="hadoopversion" value="2" />
+  <property name="hive.hadoop.shims.version" value="0.23" />
 
   <property name="mvnrepo" value="http://repo2.maven.org/maven2"/>
 

Modified: pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java (original)
+++ pig/trunk/test/org/apache/pig/TestLoadStoreFuncLifeCycle.java Sat Jan  7 00:11:37 2017
@@ -360,7 +360,7 @@ public class TestLoadStoreFuncLifeCycle
             // result, the number of StoreFunc instances is greater by 1 in
             // Hadoop-2.0.x.
             assertTrue("storer instanciation count increasing: " + Storer.count,
-                    Storer.count <= (org.apache.pig.impl.util.Utils.isHadoop2() ? 5 : 4));
+                    Storer.count <= 5);
         }
     }
 

Modified: pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java Sat Jan  7 00:11:37 2017
@@ -82,41 +82,38 @@ public class TestQueryParserUtils {
         QueryParserUtils.setHdfsServers("hello://nn1/tmp", pc);
         assertEquals(null, props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
 
-        if(org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) {
-            // webhdfs
-            props.remove(MRConfiguration.JOB_HDFS_SERVERS);
-            QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc);
-            assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-            QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc);
-            assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-
-            // har with webhfs
-            QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc);
-            assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-            QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc);
-            assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-            props.remove(MRConfiguration.JOB_HDFS_SERVERS);
-            QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc);
-            assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-
-            //viewfs
-            props.remove(MRConfiguration.JOB_HDFS_SERVERS);
-            QueryParserUtils.setHdfsServers("viewfs:/tmp", pc);
-            assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-            QueryParserUtils.setHdfsServers("viewfs:///tmp", pc);
-            assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-            QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc);
-            assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-
-            //har with viewfs
-            props.remove(MRConfiguration.JOB_HDFS_SERVERS);
-            QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc);
-            assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
-            QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc);
-            assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+        // webhdfs
+        props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+        QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc);
+        assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+        QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc);
+        assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+
+        // har with webhfs
+        QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc);
+        assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+        QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc);
+        assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+        props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+        QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc);
+        assertEquals("webhdfs://nn1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+
+        //viewfs
+        props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+        QueryParserUtils.setHdfsServers("viewfs:/tmp", pc);
+        assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+        QueryParserUtils.setHdfsServers("viewfs:///tmp", pc);
+        assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+        QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc);
+        assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+
+        //har with viewfs
+        props.remove(MRConfiguration.JOB_HDFS_SERVERS);
+        QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc);
+        assertEquals("viewfs://", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
+        QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc);
+        assertEquals("viewfs://,viewfs://cluster1", props.getProperty(MRConfiguration.JOB_HDFS_SERVERS));
 
-
-        }
     }
 
 

Modified: pig/trunk/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBZip.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBZip.java Sat Jan  7 00:11:37 2017
@@ -43,7 +43,6 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -67,16 +66,10 @@ public class TestBZip {
 
     @Parameters(name = "pig.bzip.use.hadoop.inputformat = {0}.")
     public static Iterable<Object[]> data() {
-        if ( HadoopShims.isHadoopYARN() ) {
-            return Arrays.asList(new Object[][] {
-                { false  },
-                { true   }
-            });
-        } else {
-            return Arrays.asList(new Object[][] {
-                { false }
-            });
-        }
+        return Arrays.asList(new Object[][] {
+            { false  },
+            { true   }
+        });
     }
 
     public TestBZip (Boolean useBzipFromHadoop) {

Modified: pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobControlCompiler.java Sat Jan  7 00:11:37 2017
@@ -63,7 +63,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
@@ -131,7 +130,7 @@ public class TestJobControlCompiler {
     // verifying the jar gets on distributed cache
     Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
     // guava jar is not shipped with Hadoop 2.x
-    Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), HadoopShims.isHadoopYARN() ? 5 : 6, fileClassPaths.length);
+    Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 5, fileClassPaths.length);
     Path distributedCachePath = fileClassPaths[0];
     Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
     // hadoop bug requires path to not contain hdfs://hotname in front
@@ -235,22 +234,12 @@ public class TestJobControlCompiler {
           // 4. another.jar and 5. udf1.jar, and not duplicate udf.jar
           System.out.println("cache.files= " + Arrays.toString(cacheURIs));
           System.out.println("classpath.files= " + Arrays.toString(fileClassPaths));
-          if (HadoopShims.isHadoopYARN()) {
-              // Default jars - 5 (pig, antlr, joda-time, automaton)
-              // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
-              Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
-                      Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
-              Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9,
-                      Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
-          } else {
-              // Default jars - 5. Has guava in addition
-              // There will be same entries duplicated for udf.jar and udf2.jar
-              Assert.assertEquals("size 12 for " + Arrays.toString(cacheURIs), 12,
-                      Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
-              Assert.assertEquals("size 12 for " + Arrays.toString(fileClassPaths), 12,
-                      Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
-          }
-
+          // Default jars - 5 (pig, antlr, joda-time, automaton)
+          // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
+          Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
+                  Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
+          Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9,
+                  Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
           // Count occurrences of the resources
           Map<String, Integer> occurrences = new HashMap<String, Integer>();
 
@@ -259,22 +248,12 @@ public class TestJobControlCompiler {
               val = (val == null) ? 1 : ++val;
               occurrences.put(cacheURI.toString(), val);
           }
-          if (HadoopShims.isHadoopYARN()) {
-              Assert.assertEquals(9, occurrences.size());
-          } else {
-              Assert.assertEquals(10, occurrences.size()); //guava jar in addition
-          }
+          Assert.assertEquals(9, occurrences.size());
 
           for (String file : occurrences.keySet()) {
-              if (!HadoopShims.isHadoopYARN() && (file.endsWith("udf.jar") || file.endsWith("udf2.jar"))) {
-                  // Same path added twice which is ok. It should not be a shipped to hdfs temp path.
-                  // We assert path is same by checking count
-                  Assert.assertEquals("Two occurrences for " + file, 2, (int) occurrences.get(file));
-              } else {
-                  // check that only single occurrence even though we added once to dist cache (simulating via Oozie)
-                  // and second time through pig register jar when there is symlink
-                  Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file));
-              }
+              // check that only single occurrence even though we added once to dist cache (simulating via Oozie)
+              // and second time through pig register jar when there is symlink
+              Assert.assertEquals("One occurrence for " + file, 1, (int) occurrences.get(file));
           }
       }
 

Modified: pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java Sat Jan  7 00:11:37 2017
@@ -45,12 +45,8 @@ public abstract class TestLoaderStorerSh
                 "store a into 'ooo';";
         PhysicalPlan pp = Util.buildPp(pigServer, query);
 
-        String hadoopVersion = "20S";
-        if (Utils.isHadoop23() || Utils.isHadoop2()) {
-            hadoopVersion = "23";
-        }
-        String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde", 
-                "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"};
+        String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+                "hive-shims-0.23", "hive-shims-common", "kryo"};
 
         checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
     }
@@ -61,12 +57,8 @@ public abstract class TestLoaderStorerSh
                 "store a into 'ooo' using OrcStorage;";
         PhysicalPlan pp = Util.buildPp(pigServer, query);
 
-        String hadoopVersion = "20S";
-        if (Utils.isHadoop23() || Utils.isHadoop2()) {
-            hadoopVersion = "23";
-        }
-        String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde", 
-                "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"};
+        String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde",
+                "hive-shims-0.23", "hive-shims-common", "kryo"};
 
         checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
     }

Modified: pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java Sat Jan  7 00:11:37 2017
@@ -1558,14 +1558,7 @@ public class TestMultiQueryCompiler {
         MROperPlan mrp = null;
 
         try {
-            java.lang.reflect.Method compile = launcher.getClass()
-                    .getDeclaredMethod("compile",
-                            new Class[] { PhysicalPlan.class, PigContext.class });
-
-            compile.setAccessible(true);
-
-            mrp = (MROperPlan) compile.invoke(launcher, new Object[] { pp, myPig.getPigContext() });
-
+            mrp = launcher.compile(pp, myPig.getPigContext());
             Assert.assertNotNull(mrp);
 
         } catch (Exception e) {

Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Sat Jan  7 00:11:37 2017
@@ -62,6 +62,7 @@ import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -763,10 +764,9 @@ public class TestPigRunner {
     }
 
     @Test
+    @Ignore
+    // Skip in hadoop 23 test, see PIG-2449
     public void classLoaderTest() throws Exception {
-        // Skip in hadoop 23 test, see PIG-2449
-        if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2())
-            return;
         PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
         w.println("register test/org/apache/pig/test/data/pigtestloader.jar");
         w.println("A = load '" + INPUT_FILE + "' using org.apache.pig.test.PigTestLoader();");

Modified: pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigStatsMR.java Sat Jan  7 00:11:37 2017
@@ -103,11 +103,7 @@ public class TestPigStatsMR extends Test
 
     private static MROperPlan getMRPlan(PhysicalPlan pp, PigContext ctx) throws Exception {
         MapReduceLauncher launcher = new MapReduceLauncher();
-        java.lang.reflect.Method compile = launcher.getClass()
-                .getDeclaredMethod("compile",
-                        new Class[] { PhysicalPlan.class, PigContext.class });
-        compile.setAccessible(true);
-        return (MROperPlan) compile.invoke(launcher, new Object[] { pp, ctx });
+        return launcher.compile(pp,ctx);
     }
 
     private static String getAlias(MapReduceOper mro) throws Exception {

Modified: pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Sat Jan  7 00:11:37 2017
@@ -65,6 +65,7 @@ public class TestSkewedJoin {
     private static final String INPUT_FILE5 = "SkewedJoinInput5.txt";
     private static final String INPUT_FILE6 = "SkewedJoinInput6.txt";
     private static final String INPUT_FILE7 = "SkewedJoinInput7.txt";
+    private static final String INPUT_FILE8 = "SkewedJoinInput8.txt";
     private static final String TEST_DIR = Util.getTestDirectory(TestSkewedJoin.class);
     private static final String INPUT_DIR = TEST_DIR + Path.SEPARATOR + "input";
     private static final String OUTPUT_DIR = TEST_DIR + Path.SEPARATOR + "output";
@@ -173,6 +174,11 @@ public class TestSkewedJoin {
         }
         w7.close();
 
+        //Empty file
+        PrintWriter w8 = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE8));
+        w8.close();
+
+
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1);
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE2, INPUT_FILE2);
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE3, INPUT_FILE3);
@@ -180,6 +186,7 @@ public class TestSkewedJoin {
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE5, INPUT_FILE5);
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE6, INPUT_FILE6);
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE7, INPUT_FILE7);
+        Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE8, INPUT_FILE8);
     }
 
     private static void deleteFiles() throws IOException {
@@ -187,6 +194,21 @@ public class TestSkewedJoin {
     }
 
     @Test
+    public void testSkewedJoinMapLeftEmpty() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE8 + "' as (idM:[]);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
+        pigServer.registerQuery("C = join A by idM#'id', B by id using 'skewed' PARALLEL 2;");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        int count = 0;
+        while(iter.hasNext()) {
+            count++;
+            iter.next();
+        }
+        assertEquals(0, count);
+    }
+
+
+    @Test
     public void testSkewedJoinWithGroup() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Sat Jan  7 00:11:37 2017
@@ -75,7 +75,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.builtin.Utf8StorageConverter;
 import org.apache.pig.data.BagFactory;
@@ -648,13 +648,10 @@ public class Util {
          }
      }
 
-     static private String getMkDirCommandForHadoop2_0(String fileName) {
-         if (org.apache.pig.impl.util.Utils.isHadoop23() || org.apache.pig.impl.util.Utils.isHadoop2()) {
-             Path parentDir = new Path(fileName).getParent();
-             String mkdirCommand = parentDir.getName().isEmpty() ? "" : "fs -mkdir -p " + parentDir + "\n";
-             return mkdirCommand;
-         }
-         return "";
+     static private String getFSMkDirCommand(String fileName) {
+         Path parentDir = new Path(fileName).getParent();
+         String mkdirCommand = parentDir.getName().isEmpty() ? "" : "fs -mkdir -p " + parentDir + "\n";
+         return mkdirCommand;
      }
 
     /**
@@ -676,7 +673,7 @@ public class Util {
             fileNameOnCluster = fileNameOnCluster.replace('\\','/');
         }
         PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
-        String script = getMkDirCommandForHadoop2_0(fileNameOnCluster) + "fs -put " + localFileName + " " + fileNameOnCluster;
+        String script = getFSMkDirCommand(fileNameOnCluster) + "fs -put " + localFileName + " " + fileNameOnCluster;
         GruntParser parser = new GruntParser(new StringReader(script), ps);
         parser.setInteractive(false);
         try {
@@ -907,14 +904,7 @@ public class Util {
         MapRedUtil.checkLeafIsStore(pp, pc);
 
         MapReduceLauncher launcher = new MapReduceLauncher();
-
-        java.lang.reflect.Method compile = launcher.getClass()
-                .getDeclaredMethod("compile",
-                        new Class[] { PhysicalPlan.class, PigContext.class });
-
-        compile.setAccessible(true);
-
-        return (MROperPlan) compile.invoke(launcher, new Object[] { pp, pc });
+        return launcher.compile(pp,pc);
     }
 
     public static MROperPlan buildMRPlan(String query, PigContext pc) throws Exception {
@@ -1357,16 +1347,7 @@ public class Util {
 
         // For tez testing, we want to avoid TezResourceManager/LocalResource reuse
         // (when switching between local and mapreduce/tez)
-        if( HadoopShims.isHadoopYARN() ) {
-            try {
-                java.lang.reflect.Method tez_dropInstance = Class.forName(
-                  "org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager").getDeclaredMethod(
-                  "dropInstance", (Class<?>[]) null );
-                tez_dropInstance.invoke(null);
-            } catch (Exception e){
-                throw new RuntimeException(e);
-            }
-        }
+        TezResourceManager.dropInstance();
 
         // TODO: once we have Tez local mode, we can get rid of this. For now,
         // if we run this test suite in Tez mode and there are some tests

Modified: pig/trunk/test/perf/pigmix/bin/generate_data.sh
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/bin/generate_data.sh?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/perf/pigmix/bin/generate_data.sh (original)
+++ pig/trunk/test/perf/pigmix/bin/generate_data.sh Sat Jan  7 00:11:37 2017
@@ -25,20 +25,11 @@ fi
 
 source $PIGMIX_HOME/conf/config.sh
 
-if [ $HADOOP_VERSION == "23" ]; then
-    echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot"
-    $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot
-else
-    echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot"
-    $HADOOP_HOME/bin/hadoop fs -mkdir $hdfsroot
-fi
+echo "Going to run $HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot"
+$HADOOP_HOME/bin/hadoop fs -mkdir -p $hdfsroot
 
 shopt -s extglob
-if [ $HADOOP_VERSION == "23" ]; then
-    pigjar=`echo $PIG_HOME/pig*-h2.jar`
-else
-    pigjar=`echo $PIG_HOME/pig*-h1.jar`
-fi
+pigjar=`echo $PIG_HOME/pig*-h2.jar`
 
 pigmixjar=$PIGMIX_HOME/pigmix.jar
 

Modified: pig/trunk/test/perf/pigmix/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/test/perf/pigmix/build.xml?rev=1777738&r1=1777737&r2=1777738&view=diff
==============================================================================
--- pig/trunk/test/perf/pigmix/build.xml (original)
+++ pig/trunk/test/perf/pigmix/build.xml Sat Jan  7 00:11:37 2017
@@ -34,6 +34,8 @@
     </fileset>
   </path>
 
+  <property name="hadoopversion" value="2" />
+
   <property name="java.dir" value="${basedir}/src/java"/>
   <property name="pigmix.build.dir" value="${basedir}/build"/>
   <property name="pigmix.jar" value="${basedir}/pigmix.jar"/>