You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/05/30 21:07:29 UTC

svn commit: r1598702 [12/23] - in /pig/trunk: ./ ivy/ shims/src/hadoop23/org/apache/pig/backend/hadoop23/ shims/test/hadoop20/org/apache/pig/test/ shims/test/hadoop23/org/apache/pig/test/ src/META-INF/services/ src/org/apache/pig/ src/org/apache/pig/ba...

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Fri May 30 19:07:23 2014
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.JarManager;
+
+public class TezResourceManager {
+    private Path stagingDir;
+    private PigContext pigContext;
+    private Configuration conf;
+    private URL bootStrapJar;
+    private FileSystem remoteFs;
+    public Map<String, Path> resources = new HashMap<String, Path>();
+
+    public URL getBootStrapJar() {
+        return bootStrapJar;
+    }
+
+    public TezResourceManager(Path stagingDir, PigContext pigContext, Configuration conf) throws IOException {
+        resources.clear();
+        this.stagingDir = stagingDir;
+        this.pigContext = pigContext;
+        this.conf = conf;
+        String jar = JarManager.findContainingJar(org.apache.pig.Main.class);
+        this.bootStrapJar = new File(jar).toURI().toURL();
+        remoteFs = FileSystem.get(conf);
+        addBootStrapJar();
+    }
+
+    // Add files from the source FS as local resources. The resource name will
+    // be the same as the file name.
+    public Path addTezResource(URL url) throws IOException {
+        Path resourcePath = new Path(url.getFile());
+        String resourceName = resourcePath.getName();
+
+        if (resources.containsKey(resourceName)) {
+            return resources.get(resourceName);
+        }
+
+        // Ship the resource to the staging directory on the remote FS
+        Path remoteFsPath = remoteFs.makeQualified(new Path(stagingDir, resourceName));
+        remoteFs.copyFromLocalFile(resourcePath, remoteFsPath);
+        resources.put(resourceName, remoteFsPath);
+        return remoteFsPath;
+    }
+
+    // Add files already present in the remote FS as local resources. Allow the
+    // resource name to be different from the file name to to support resource
+    // aliasing in a CACHE statement (and to allow the same file to be aliased
+    // with multiple resource names).
+    public void addTezResource(String resourceName, Path remoteFsPath) throws IOException {
+        if (!resources.containsKey(resourceName)) {
+            resources.put(resourceName, remoteFsPath);
+        }
+    }
+
+    public Map<String, LocalResource> addTezResources(Set<URL> resources) throws Exception {
+        Set<String> resourceNames = new HashSet<String>();
+        for (URL url : resources) {
+            addTezResource(url);
+            resourceNames.add(new Path(url.getFile()).getName());
+        }
+        return getTezResources(resourceNames);
+    }
+
+    public void addBootStrapJar() throws IOException {
+        if (resources.containsKey(bootStrapJar)) {
+            return;
+        }
+
+        FileSystem remoteFs = FileSystem.get(conf);
+        File jobJar = File.createTempFile("Job", ".jar");
+        jobJar.deleteOnExit();
+        FileOutputStream fos = new FileOutputStream(jobJar);
+        JarManager.createBootStrapJar(fos, pigContext);
+
+        // Ship the job.jar to the staging directory on the remote FS
+        Path remoteJarPath = remoteFs.makeQualified(new Path(stagingDir, new Path(bootStrapJar.getFile()).getName()));
+        remoteFs.copyFromLocalFile(new Path(jobJar.getAbsolutePath()), remoteJarPath);
+        resources.put(new Path(bootStrapJar.getFile()).getName(), remoteJarPath);
+    }
+
+    public Map<String, LocalResource> getTezResources(Set<String> resourceNames) throws Exception {
+        Map<String, LocalResource> tezResources = new HashMap<String, LocalResource>();
+        for (String resourceName : resourceNames) {
+            // The resource name will be symlinked to the resource path in the
+            // container's working directory.
+            Path resourcePath = resources.get(resourceName);
+            FileStatus fstat = remoteFs.getFileStatus(resourcePath);
+
+            LocalResource tezResource = LocalResource.newInstance(
+                    ConverterUtils.getYarnUrlFromPath(fstat.getPath()),
+                    LocalResourceType.FILE,
+                    LocalResourceVisibility.APPLICATION,
+                    fstat.getLen(),
+                    fstat.getModificationTime());
+
+            tezResources.put(resourceName, tezResource);
+        }
+        return tezResources;
+    }
+}
+

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Fri May 30 19:07:23 2014
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
+import org.apache.pig.impl.PigContext;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class TezSessionManager {
+
+    private static final Log log = LogFactory.getLog(TezSessionManager.class);
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+
+            @Override
+            public void run() {
+                TezSessionManager.shutdown();
+            }
+        });
+    }
+
+    private TezSessionManager() {
+    }
+
+    public static class SessionInfo {
+        SessionInfo(TezSession session, Map<String, LocalResource> resources) {
+            this.session = session;
+            this.resources = resources;
+        }
+        public Map<String, LocalResource> getResources() {
+            return resources;
+        }
+        public TezSession getTezSession() {
+            return session;
+        }
+        public void setInUse(boolean inUse) {
+            this.inUse = inUse;
+        }
+        private TezSession session;
+        private Map<String, LocalResource> resources;
+        private boolean inUse = false;
+    }
+
+    private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>();
+
+    private static void waitForTezSessionReady(TezSession tezSession)
+        throws IOException, TezException {
+        while (true) {
+            TezSessionStatus status = tezSession.getSessionStatus();
+            if (status.equals(TezSessionStatus.SHUTDOWN)) {
+                //TODO: TEZ-1017 Show diagnostics message
+                //log.error("TezSession has already shutdown. Diagnostics: " + tezSession.getSessionDiagnostics());
+                throw new RuntimeException("TezSession has already shutdown");
+            }
+            if (status.equals(TezSessionStatus.READY)) {
+                return;
+            }
+            try {
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                throw new IOException("Interrupted while trying to check session status", e);
+            }
+        }
+    }
+
+    private static SessionInfo createSession(Configuration conf, Map<String, LocalResource> requestedAMResources, Credentials creds) throws TezException, IOException {
+        TezConfiguration tezConf = new TezConfiguration(conf);
+        AMConfiguration amConfig = getAMConfig(tezConf, requestedAMResources, creds);
+        TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConf);
+
+        String jobName = conf.get(PigContext.JOB_NAME, "pig");
+        TezClient tezClient = new TezClient(tezConf);
+        ApplicationId appId = tezClient.createApplication();
+
+        TezSession tezSession = new TezSession(jobName, appId, sessionConfig);
+        tezSession.start();
+        waitForTezSessionReady(tezSession);
+        return new SessionInfo(tezSession, requestedAMResources);
+    }
+
+    private static AMConfiguration getAMConfig(TezConfiguration tezConf, Map<String, LocalResource> resources, Credentials creds) {
+        TezConfiguration dagAMConf = MRToTezHelper.getDAGAMConfFromMRConf(tezConf);
+        Map<String, String> amEnv = new HashMap<String, String>();
+        MRHelpers.updateEnvironmentForMRAM(tezConf, amEnv);
+
+        AMConfiguration amConfig = new AMConfiguration(amEnv, resources, dagAMConf, creds);
+        return amConfig;
+    }
+
+    private static boolean validateSessionResources(SessionInfo currentSession, Map<String, LocalResource> requestedAMResources) throws TezException, IOException {
+        for (Map.Entry<String, LocalResource> entry : requestedAMResources.entrySet()) {
+            if (!currentSession.resources.entrySet().contains(entry)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    static TezSession getSession(Configuration conf, Map<String, LocalResource> requestedAMResources, Credentials creds) throws TezException, IOException {
+        synchronized (sessionPool) {
+            List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>();
+            for (SessionInfo sessionInfo : sessionPool) {
+                if (sessionInfo.session.getSessionStatus()==TezSessionStatus.SHUTDOWN) {
+                    sessionsToRemove.add(sessionInfo);
+                }
+                else if (!sessionInfo.inUse && sessionInfo.session.getSessionStatus()==TezSessionStatus.READY &&
+                        validateSessionResources(sessionInfo, requestedAMResources)) {
+                    sessionInfo.inUse = true;
+                    return sessionInfo.session;
+                }
+            }
+
+            for (SessionInfo sessionToRemove : sessionsToRemove) {
+                sessionPool.remove(sessionToRemove);
+            }
+
+            // We cannot find available AM, create new one
+            SessionInfo sessionInfo = createSession(conf, requestedAMResources, creds);
+            sessionInfo.inUse = true;
+            sessionPool.add(sessionInfo);
+            return sessionInfo.session;
+        }
+    }
+
+    static void freeSession(TezSession session) {
+        synchronized (sessionPool) {
+            for (SessionInfo sessionInfo : sessionPool) {
+                if (sessionInfo.session == session) {
+                    sessionInfo.inUse = false;
+                    break;
+                }
+            }
+        }
+    }
+
+    static void stopSession(TezSession session) throws TezException, IOException {
+        synchronized (sessionPool) {
+            Iterator<SessionInfo> iter = sessionPool.iterator();
+            while (iter.hasNext()) {
+                SessionInfo sessionInfo = iter.next();
+                if (sessionInfo.session == session) {
+                    log.info("Shutting down Tez session " + session);
+                    session.stop();
+                    iter.remove();
+                    break;
+                }
+            }
+        }
+    }
+
+    @VisibleForTesting
+    public static void shutdown() {
+        synchronized (sessionPool) {
+            for (SessionInfo sessionInfo : sessionPool) {
+                try {
+                    log.info("Shutting down Tez session " + sessionInfo.session);
+                    sessionInfo.session.stop();
+                } catch (Exception e) {
+                    log.error("Error shutting down Tez session "  + sessionInfo.session, e);
+                }
+            }
+            sessionPool.clear();
+        }
+    }
+}
+

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java Fri May 30 19:07:23 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+/**
+ * This interface is implemented by PhysicalOperators that can need to access
+ * TezProcessorContext of a Tez task.
+ */
+
+public interface TezTaskConfigurable {
+
+    public void initialize(TezProcessorContext processorContext) throws ExecException;
+
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java Fri May 30 19:07:23 2014
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.pig.backend.hadoop.executionengine.TaskContext;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.hadoop.mapred.MRCounters.MRCounter;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+public class TezTaskContext extends TaskContext<TezProcessorContext> {
+    private TezProcessorContext context;
+
+    public TezTaskContext(TezProcessorContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public TezProcessorContext get() {
+        return context;
+    }
+
+    @Override
+    public Counter getCounter(Enum<?> name) {
+        if (context == null) {
+            return null;
+        }
+        TezCounter tezCounter = context.getCounters().findCounter(name);
+        return new MRCounter(tezCounter);
+    }
+
+    @Override
+    public Counter getCounter(String group, String name) {
+        if (context == null) {
+            return null;
+        }
+        TezCounter tezCounter = context.getCounters().getGroup(group).findCounter(name);
+        return new MRCounter(tezCounter);
+    }
+
+    @Override
+    public boolean incrCounter(Enum<?> name, long delta) {
+        if (context == null) {
+            return false;
+        }
+        TezCounter counter = context.getCounters().findCounter(name);
+        counter.increment(delta);
+        return true;
+    }
+
+    @Override
+    public boolean incrCounter(String group, String name, long delta) {
+        if (context == null) {
+            return false;
+        }
+        TezCounter counter = context.getCounters().getGroup(group).findCounter(name);
+        counter.increment(delta);
+        return true;
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java Fri May 30 19:07:23 2014
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalMap;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+public class WeightedRangePartitionerTez extends WeightedRangePartitioner {
+    private static final Log LOG = LogFactory.getLog(WeightedRangePartitionerTez.class);
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void init() {
+        ObjectCache cache = ObjectCache.getInstance();
+        String isCachedKey = "sample-" + PigProcessor.sampleVertex + ".cached";
+        String quantilesCacheKey = "sample-" + PigProcessor.sampleVertex + ".quantiles";
+        String weightedPartsCacheKey = "sample-" + PigProcessor.sampleVertex + ".weightedParts";
+        if (cache.retrieve(isCachedKey) == Boolean.TRUE) {
+            quantiles = (PigNullableWritable[]) cache
+                    .retrieve(quantilesCacheKey);
+            weightedParts = (Map<PigNullableWritable, DiscreteProbabilitySampleGenerator>) cache
+                    .retrieve(weightedPartsCacheKey);
+            LOG.info("Found quantiles and weightedParts in Tez cache. cachekey="
+                    + quantilesCacheKey + "," + weightedPartsCacheKey);
+            inited = true;
+            return;
+        }
+
+        Map<String, Object> quantileMap = null;
+        if (PigProcessor.sampleMap != null) {
+            // We've collected sampleMap in PigProcessor
+            quantileMap = PigProcessor.sampleMap;
+        } else {
+            LOG.info("Quantiles map is empty");
+            inited = true;
+            return;
+        }
+
+        long start = System.currentTimeMillis();
+        try {
+            DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
+            InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
+            convertToArray(quantilesList);
+            for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
+                Tuple key = (Tuple) ent.getKey(); // sample item which repeats
+                float[] probVec = getProbVec((Tuple) ent.getValue());
+                weightedParts.put(getPigNullableWritable(key),
+                        new DiscreteProbabilitySampleGenerator(probVec));
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        LOG.info("Initialized WeightedRangePartitionerTez. Time taken: " + (System.currentTimeMillis() - start));
+        cache.cache(isCachedKey, Boolean.TRUE);
+        cache.cache(quantilesCacheKey, quantiles);
+        cache.cache(weightedPartsCacheKey, weightedParts);
+        inited = true;
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java Fri May 30 19:07:23 2014
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez.operators;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezInput;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOutput;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+
+/**
+ * POCounterStatsTez is used to group counters from previous vertex POCounterTez tasks
+ */
+public class POCounterStatsTez extends PhysicalOperator implements TezInput, TezOutput {
+
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POCounterStatsTez.class);
+    private String inputKey;
+    private String outputKey;
+    // TODO: Even though we expect only one record from POCounter, because of Shuffle we have
+    // KeyValuesReader. After TEZ-661, switch to unsorted shuffle
+    private transient KeyValuesReader reader;
+    private transient KeyValueWriter writer;
+
+    public POCounterStatsTez(OperatorKey k) {
+        super(k);
+    }
+
+    @Override
+    public String[] getTezInputs() {
+        return new String[] { inputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        if (oldInputKey.equals(inputKey)) {
+            inputKey = newInputKey;
+        }
+    }
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+    }
+
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs,
+            Configuration conf)
+            throws ExecException {
+        LogicalInput input = inputs.get(inputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + inputKey + " is missing");
+        }
+        try {
+            reader = (KeyValuesReader) input.getReader();
+            LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public String[] getTezOutputs() {
+        return new String[] { outputKey };
+    }
+
+    @Override
+    public void replaceOutput(String oldOutputKey, String newOutputKey) {
+        if (oldOutputKey.equals(outputKey)) {
+            outputKey = newOutputKey;
+        }
+    }
+
+    @Override
+    public void attachOutputs(Map<String, LogicalOutput> outputs,
+            Configuration conf) throws ExecException {
+        LogicalOutput output = outputs.get(outputKey);
+        if (output == null) {
+            throw new ExecException("Output to vertex " + outputKey + " is missing");
+        }
+        try {
+            writer = (KeyValueWriter) output.getWriter();
+            LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        try {
+            Map<Integer, Long> counterRecords = new HashMap<Integer, Long>();
+            Integer key = null;
+            Long value = null;
+            // Read count of records per task
+            while (reader.next()) {
+                key = ((IntWritable)reader.getCurrentKey()).get();
+                for (Object val : reader.getCurrentValues()) {
+                    value = ((LongWritable)val).get();
+                    counterRecords.put(key, value);
+                }
+            }
+
+            // BinInterSedes only takes String for map key
+            Map<String, Long> counterOffsets = new HashMap<String, Long>();
+            // Create a map to contain task ids and beginning offset of record count
+            // based on total record count of all tasks
+            // For eg: If Task 0 has 5 records, Task 1 has 10 records and Task 2 has 3 records
+            // map will contain {0=0, 1=5, 2=15}
+            Long prevTasksCount = counterRecords.get(0);
+            counterOffsets.put("0", 0L);
+            for (int i = 1; i < counterRecords.size(); i++) {
+                counterOffsets.put("" + i, prevTasksCount);
+                prevTasksCount += counterRecords.get(i);
+            }
+
+            Tuple tuple = TupleFactory.getInstance().newTuple(1);
+            tuple.set(0, counterOffsets);
+            writer.write(POValueOutputTez.EMPTY_KEY, tuple);
+            return RESULT_EOP;
+        } catch (IOException e) {
+            throw new ExecException(e);
+        }
+    }
+
+    public void setInputKey(String inputKey) {
+        this.inputKey = inputKey;
+    }
+
+    public void setOutputKey(String outputKey) {
+        this.outputKey = outputKey;
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visit(this);
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return "PORankStatsTez - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey;
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java Fri May 30 19:07:23 2014
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.operators;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOutput;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezTaskConfigurable;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+public class POCounterTez extends POCounter implements TezOutput, TezTaskConfigurable {
+
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POCounterTez.class);
+
+    private String tuplesOutputKey;
+    private String statsOutputKey;
+
+    private transient KeyValueWriter tuplesWriter;
+    private transient KeyValueWriter statsWriter;
+    private transient long totalTaskRecords = 0;
+
+    public POCounterTez(POCounter copy) {
+        super(copy);
+    }
+
+    @Override
+    public void initialize(TezProcessorContext processorContext)
+            throws ExecException {
+        this.setTaskId(processorContext.getTaskIndex());
+    }
+
+    public void setTuplesOutputKey(String tuplesOutputKey) {
+        this.tuplesOutputKey = tuplesOutputKey;
+    }
+
+    public void setStatsOutputKey(String statsOutputKey) {
+        this.statsOutputKey = statsOutputKey;
+    }
+
+    @Override
+    public String[] getTezOutputs() {
+        return new String[] { tuplesOutputKey, statsOutputKey };
+    }
+
+    @Override
+    public void replaceOutput(String oldOutputKey, String newOutputKey) {
+        if (oldOutputKey.equals(tuplesOutputKey)) {
+            tuplesOutputKey = newOutputKey;
+        } else if (oldOutputKey.equals(statsOutputKey)) {
+            statsOutputKey = newOutputKey;
+        }
+    }
+
+    @Override
+    public void attachOutputs(Map<String, LogicalOutput> outputs,
+            Configuration conf) throws ExecException {
+        LogicalOutput output = outputs.get(tuplesOutputKey);
+        if (output == null) {
+            throw new ExecException("Output to vertex " + tuplesOutputKey + " is missing");
+        }
+        try {
+            tuplesWriter = (KeyValueWriter) output.getWriter();
+            LOG.info("Attached output to vertex " + tuplesOutputKey + " : output=" + output + ", writer=" + tuplesWriter);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+
+        output = outputs.get(statsOutputKey);
+        if (output == null) {
+            throw new ExecException("Output to vertex " + statsOutputKey + " is missing");
+        }
+        try {
+            statsWriter = (KeyValueWriter) output.getWriter();
+            LOG.info("Attached output to vertex " + statsOutputKey + " : output=" + output + ", writer=" + statsWriter);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        Result inp = null;
+        try {
+            while (true) {
+                inp = processInput();
+                if (inp.returnStatus == POStatus.STATUS_EOP
+                        || inp.returnStatus == POStatus.STATUS_ERR)
+                    break;
+                if (inp.returnStatus == POStatus.STATUS_NULL) {
+                    continue;
+                }
+
+                tuplesWriter.write(POValueOutputTez.EMPTY_KEY,
+                        addCounterValue(inp).result);
+            }
+
+            statsWriter.write(new IntWritable(this.getTaskId()), new LongWritable(totalTaskRecords));
+
+        } catch (IOException e) {
+            throw new ExecException(e);
+        }
+        return RESULT_EOP;
+    }
+
+    @Override
+    protected Long incrementLocalCounter() {
+        totalTaskRecords++;
+        return super.incrementLocalCounter();
+    }
+
+    @Override
+    protected void addToLocalCounter(Long sizeBag) {
+        super.addToLocalCounter(sizeBag);
+        totalTaskRecords += sizeBag;
+    }
+
+    @Override
+    protected void incrementReduceCounter(Long increment) {
+        totalTaskRecords += increment;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        super.visit(v);
+        v.visit(this);
+    }
+
+    @Override
+    public String name() {
+        return "POCounterTez - " + mKey.toString() + "\t->\t " + tuplesOutputKey + "," + statsOutputKey;
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java Fri May 30 19:07:23 2014
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.operators;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.tez.ObjectCache;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezInput;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+public class PORankTez extends PORank implements TezInput {
+
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(PORankTez.class);
+
+    private String tuplesInputKey;
+    private String statsInputKey;
+    private transient boolean isInputCached;
+    private transient KeyValueReader reader;
+    private transient Map<Integer, Long> counterOffsets;
+    private transient Configuration conf;
+
+    public PORankTez(PORank copy) {
+        super(copy);
+    }
+
+    public void setTuplesInputKey(String tuplesInputKey) {
+        this.tuplesInputKey = tuplesInputKey;
+    }
+
+    public void setStatsInputKey(String statsInputKey) {
+        this.statsInputKey = statsInputKey;
+    }
+
+    @Override
+    public String[] getTezInputs() {
+        return new String[] { tuplesInputKey, statsInputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        if (oldInputKey.equals(tuplesInputKey)) {
+            tuplesInputKey = newInputKey;
+        } else if (oldInputKey.equals(statsInputKey)) {
+            statsInputKey = newInputKey;
+        }
+    }
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+        String cacheKey = "rankstats-" + getOperatorKey().toString();
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            isInputCached = true;
+            inputsToSkip.add(statsInputKey);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs,
+            Configuration conf) throws ExecException {
+        this.conf = conf;
+        LogicalInput input = inputs.get(tuplesInputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + tuplesInputKey + " is missing");
+        }
+        try {
+            reader = (KeyValueReader) input.getReader();
+            LOG.info("Attached input from vertex " + tuplesInputKey + " : input=" + input + ", reader=" + reader);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+
+        String cacheKey = "rankstats-" + getOperatorKey().toString();
+        if (isInputCached) {
+            counterOffsets = (Map<Integer, Long>) ObjectCache.getInstance().retrieve(cacheKey);
+            LOG.info("Found counter stats for PORankTez in Tez cache. cachekey=" + cacheKey);
+            return;
+        }
+        input = inputs.get(statsInputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + statsInputKey + " is missing");
+        }
+        try {
+            KeyValueReader reader = (KeyValueReader) input.getReader();
+            LOG.info("Attached input from vertex " + statsInputKey + " : input=" + input + ", reader=" + reader);
+            reader.next();
+            // POCounterStatsTez produces a HashMap which contains
+            // mapping of task id and the offset of record count in each task based on total record count
+            Map<String, Long> counterOffsetsTemp = (Map<String, Long>) ((Tuple)reader.getCurrentValue()).get(0);
+            counterOffsets = new HashMap<Integer, Long>(counterOffsetsTemp.size(), 1);
+            for (Entry<String, Long> entry : counterOffsetsTemp.entrySet()) {
+                counterOffsets.put(Integer.valueOf(entry.getKey()), entry.getValue());
+            }
+            ObjectCache.getInstance().cache(cacheKey, counterOffsets);
+            LOG.info("Cached PORankTez counter stats in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey);
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        Result inp = null;
+
+        try {
+            while (reader.next()) {
+                inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
+                return addRank(inp);
+            }
+        } catch (IOException e) {
+            throw new ExecException(e);
+        }
+
+        // For certain operators (such as STREAM), we could still have some work
+        // to do even after seeing the last input. These operators set a flag that
+        // says all input has been sent and to run the pipeline one more time.
+        if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
+            this.parentPlan.endOfAllInput = true;
+        }
+        return RESULT_EOP;
+    }
+
+    @Override
+    protected Long getRankCounterOffset(Integer taskId) {
+        if (illustrator != null) {
+            return 0L;
+        }
+        return counterOffsets.get(taskId);
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        super.visit(v);
+        v.visit(this);
+    }
+
+    @Override
+    public String name() {
+        return "PORankTez - " + mKey.toString() + "\t<-\t " + tuplesInputKey + "," + statsInputKey;
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POShuffledValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POShuffledValueInputTez.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POShuffledValueInputTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POShuffledValueInputTez.java Fri May 30 19:07:23 2014
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez.operators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezInput;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * POShuffledValueInputTez is used read tuples from a Tez Intermediate output from a shuffle edge
+ * To be used with POValueOutputTez.
+ * TODO: To be removed after PIG-3775 and TEZ-661
+ */
+public class POShuffledValueInputTez extends PhysicalOperator implements TezInput {
+
+    private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POShuffledValueInputTez.class);
+    private Set<String> inputKeys = new HashSet<String>();
+    private transient boolean finished = false;
+    private transient Iterator<KeyValueReader> readers;
+    private transient KeyValueReader currentReader;
+    protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+    private transient Configuration conf;
+
+    public POShuffledValueInputTez(OperatorKey k) {
+        super(k);
+    }
+
+    @Override
+    public String[] getTezInputs() {
+        return inputKeys.toArray(new String[inputKeys.size()]);
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        if (inputKeys.remove(oldInputKey)) {
+            inputKeys.add(newInputKey);
+        }
+    }
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+    }
+
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs,
+            Configuration conf) throws ExecException {
+        this.conf = conf;
+        List<KeyValueReader> readersList = new ArrayList<KeyValueReader>();
+        try {
+            for (String inputKey : inputKeys) {
+                LogicalInput input = inputs.get(inputKey);
+                if (input == null) {
+                    throw new ExecException("Input from vertex " + inputKey
+                            + " is missing");
+                }
+
+                KeyValueReader reader = (KeyValueReader) input.getReader();
+                readersList.add(reader);
+                LOG.info("Attached input from vertex " + inputKey + " : input="
+                        + input + ", reader=" + reader);
+            }
+            readers = readersList.iterator();
+            currentReader = readers.next();
+        } catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        try {
+            if (finished) {
+                return RESULT_EOP;
+            }
+
+            do {
+                if (currentReader.next()) {
+                    Tuple origTuple = (Tuple) currentReader.getCurrentValue();
+                    Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+                    return new Result(POStatus.STATUS_OK, copy);
+                }
+                currentReader = readers.hasNext() ? readers.next() : null;
+            } while (currentReader != null);
+
+            finished = true;
+            // For certain operators (such as STREAM), we could still have some work
+            // to do even after seeing the last input. These operators set a flag that
+            // says all input has been sent and to run the pipeline one more time.
+            if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
+                this.parentPlan.endOfAllInput = true;
+            }
+            return RESULT_EOP;
+        } catch (IOException e) {
+            throw new ExecException(e);
+        }
+    }
+
+    public void addInputKey(String inputKey) {
+        this.inputKeys.add(inputKey);
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visit(this);
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public String name() {
+        return "POShuffledValueInputTez - " + mKey.toString() + "\t<-\t " + inputKeys;
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java Fri May 30 19:07:23 2014
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.optimizers;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * For historical reasons splits will always produce filters that pass
+ * everything through unchanged. This optimizer removes these.
+ *
+ * The condition we look for is POFilters with a constant boolean
+ * (true) expression as it's plan.
+ */
+public class NoopFilterRemover extends TezOpPlanVisitor {
+
+    private static Log LOG = LogFactory.getLog(NoopFilterRemover.class);
+
+    public NoopFilterRemover(TezOperPlan plan) {
+        super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+    }
+
+    @Override
+    public void visitTezOp(TezOperator tezOp) throws VisitorException {
+        if (tezOp.getSplitParent() == null) {
+            return;
+        }
+        try {
+            List<POFilter> filters = PlanHelper.getPhysicalOperators(tezOp.plan, POFilter.class);
+            for (POFilter filter : filters) {
+                PhysicalPlan filterPlan = filter.getPlan();
+                if (filterPlan.size() == 1) {
+                    PhysicalOperator fp = filterPlan.getRoots().get(0);
+                    if (fp instanceof ConstantExpression) {
+                        ConstantExpression exp = (ConstantExpression)fp;
+                        Object value = exp.getValue();
+                        if (value instanceof Boolean) {
+                            Boolean filterValue = (Boolean)value;
+                            if (filterValue) {
+                                tezOp.plan.removeAndReconnect(filter);
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (PlanException e) {
+            throw new VisitorException(e);
+        }
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java Fri May 30 19:07:23 2014
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.optimizers;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.POStoreTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.RoundRobinPartitioner;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezEdgeDescriptor;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezInput;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator.VertexGroupInfo;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOutput;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+
+/**
+ * Optimizes union by removing the intermediate union vertex and making the
+ * successor get input from the predecessor vertices directly using VertexGroup.
+ * This should be run after MultiQueryOptimizer so that it handles cases like
+ * union followed by split and then store.
+ *
+ * For eg:
+ * 1) Union followed by store
+ * Vertex 1 (Load), Vertex 2 (Load) -> Vertex 3 (Union + Store) will be optimized to
+ * Vertex 1 (Load + Store), Vertex 2 (Load + Store). Both the vertices will be writing output
+ * to same store location directly which is supported by Tez.
+ * 2) Union followed by groupby
+ * Vertex 1 (Load), Vertex 2 (Load) -> Vertex 3 (Union + POLocalRearrange) -> Vertex 4 (Group by)
+ * will be optimized to Vertex 1 (Load + POLR), Vertex 2 (Load + POLR) -> Vertex 4 (Group by)
+ *
+ */
+public class UnionOptimizer extends TezOpPlanVisitor {
+
+    public UnionOptimizer(TezOperPlan plan) {
+        super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+    }
+
+    @Override
+    public void visitTezOp(TezOperator tezOp) throws VisitorException {
+        if (!tezOp.isUnion()) {
+            return;
+        }
+
+        TezOperator unionOp = tezOp;
+        String unionOpKey = unionOp.getOperatorKey().toString();
+        String scope = unionOp.getOperatorKey().scope;
+        TezOperPlan tezPlan = getPlan();
+
+        //TODO: PIG-3856 Handle replicated join. Replicate join input that was broadcast to union vertex
+        // now needs to be broadcast to all the union predecessors. How do we do that??
+        // Wait for shared edge and do it or write multiple times??
+        // For now don't optimize
+        // Create a copy as disconnect while iterating modifies the original list
+        List<TezOperator> predecessors = new ArrayList<TezOperator>(tezPlan.getPredecessors(unionOp));
+        if (predecessors.size() > unionOp.getVertexGroupMembers().size()) {
+            return;
+        }
+
+        PhysicalPlan unionOpPlan = unionOp.plan;
+
+        // Union followed by Split followed by Store could have multiple stores
+        List<POStoreTez> unionStoreOutputs = PlanHelper.getPhysicalOperators(unionOpPlan, POStoreTez.class);
+        TezOperator[] storeVertexGroupOps = new TezOperator[unionStoreOutputs.size()];
+        for (int i=0; i < storeVertexGroupOps.length; i++) {
+            storeVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
+            storeVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo(unionStoreOutputs.get(i)));
+            storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
+            tezPlan.add(storeVertexGroupOps[i]);
+        }
+
+        // Case of split, orderby, skewed join, rank, etc will have multiple outputs
+        List<TezOutput> unionOutputs = PlanHelper.getPhysicalOperators(unionOpPlan, TezOutput.class);
+        // One TezOutput can write to multiple LogicalOutputs (POCounterTez, POValueOutputTez, etc)
+        List<String> unionOutputKeys = new ArrayList<String>();
+        for (TezOutput output : unionOutputs) {
+            if (output instanceof POStoreTez) {
+                continue;
+            }
+            for (String key : output.getTezOutputs()) {
+                unionOutputKeys.add(key);
+            }
+        }
+
+        // Create vertex group operator for each output
+        TezOperator[] outputVertexGroupOps = new TezOperator[unionOutputKeys.size()];
+        String[] newOutputKeys = new String[unionOutputKeys.size()];
+        for (int i=0; i < outputVertexGroupOps.length; i++) {
+            outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
+            outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
+            outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
+            outputVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
+            newOutputKeys[i] = outputVertexGroupOps[i].getOperatorKey().toString();
+            tezPlan.add(outputVertexGroupOps[i]);
+        }
+
+        try {
+
+             // Clone plan of union and merge it into the predecessor operators
+             // Remove POShuffledValueInputTez from union plan root
+            unionOpPlan.remove(unionOpPlan.getRoots().get(0));
+            for (OperatorKey predKey : unionOp.getVertexGroupMembers()) {
+                TezOperator pred = tezPlan.getOperator(predKey);
+                PhysicalPlan predPlan = pred.plan;
+                PhysicalOperator predLeaf = predPlan.getLeaves().get(0);
+                // if predLeaf not POValueOutputTez
+                if (predLeaf instanceof POSplit) {
+                    // Find the subPlan that connects to the union operator
+                    predPlan = getUnionPredPlanFromSplit(predPlan, unionOpKey);
+                    predLeaf = predPlan.getLeaves().get(0);
+                }
+
+                PhysicalPlan clonePlan = unionOpPlan.clone();
+                //Clone changes the operator keys
+                List<POStoreTez> clonedUnionStoreOutputs = PlanHelper.getPhysicalOperators(clonePlan, POStoreTez.class);
+
+                // Remove POValueOutputTez from predecessor leaf
+                predPlan.remove(predLeaf);
+                boolean isEmptyPlan = predPlan.isEmpty();
+                if (!isEmptyPlan) {
+                    predLeaf = predPlan.getLeaves().get(0);
+                }
+                predPlan.merge(clonePlan);
+                if (!isEmptyPlan) {
+                    predPlan.connect(predLeaf, clonePlan.getRoots().get(0));
+                }
+
+                // Connect predecessor to the storeVertexGroups
+                int i = 0;
+                for (TezOperator storeVertexGroup : storeVertexGroupOps) {
+                    storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
+                    //Set the output key of cloned POStore to that of the initial union POStore.
+                    clonedUnionStoreOutputs.get(i).setOutputKey(
+                            storeVertexGroup.getVertexGroupInfo().getStore()
+                                    .getOperatorKey().toString());
+                    pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(),
+                            storeVertexGroup.getOperatorKey());
+                    tezPlan.connect(pred, storeVertexGroup);
+                }
+
+                for (TezOperator outputVertexGroup : outputVertexGroupOps) {
+                    outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
+                    tezPlan.connect(pred, outputVertexGroup);
+                }
+
+                copyOperatorProperties(pred, unionOp);
+                tezPlan.disconnect(pred, unionOp);
+            }
+
+            List<TezOperator> successors = tezPlan.getSuccessors(unionOp);
+            List<TezOutput> valueOnlyOutputs = new ArrayList<TezOutput>();
+            for (TezOutput tezOutput : unionOutputs) {
+                if (tezOutput instanceof POValueOutputTez) {
+                    valueOnlyOutputs.add(tezOutput);
+                }
+            }
+            // Connect to outputVertexGroupOps
+            // Copy output edges of union -> successor to predecessor->successor, vertexgroup -> successor
+            // and connect vertexgroup -> successor in the plan.
+            for (Entry<OperatorKey, TezEdgeDescriptor> entry : unionOp.outEdges.entrySet()) {
+                TezOperator succOp = tezPlan.getOperator(entry.getKey());
+                // Case of union followed by union.
+                // unionOp.outEdges will not point to vertex group, but to its output.
+                // So find the vertex group if there is one.
+                TezOperator succOpVertexGroup = null;
+                for (TezOperator succ : successors) {
+                    if (succ.isVertexGroup()
+                            && succ.getVertexGroupInfo().getOutput()
+                                    .equals(succOp.getOperatorKey().toString())) {
+                        succOpVertexGroup = succ;
+                        break;
+                    }
+                }
+                TezEdgeDescriptor edge = entry.getValue();
+                // Edge cannot be one to one as it will get input from two or
+                // more union predecessors. Change it to SCATTER_GATHER
+                if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
+                    edge.dataMovementType = DataMovementType.SCATTER_GATHER;
+                    edge.partitionerClass = RoundRobinPartitioner.class;
+                    edge.outputClassName = OnFileSortedOutput.class.getName();
+                    edge.inputClassName = ShuffledMergedInput.class.getName();
+
+                    for (TezOutput tezOutput : valueOnlyOutputs) {
+                        if (ArrayUtils.contains(tezOutput.getTezOutputs(), entry.getKey().toString())) {
+                            edge.setIntermediateOutputKeyComparatorClass(
+                                    POValueOutputTez.EmptyWritableComparator.class.getName());
+                        }
+                    }
+                }
+                TezOperator vertexGroupOp = outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
+                for (OperatorKey predKey : vertexGroupOp.getVertexGroupMembers()) {
+                    TezOperator pred = tezPlan.getOperator(predKey);
+                    // Keep the output edge directly to successor
+                    // Don't need to keep output edge for vertexgroup
+                    pred.outEdges.put(entry.getKey(), edge);
+                    succOp.inEdges.put(predKey, edge);
+                    if (succOpVertexGroup != null) {
+                        succOpVertexGroup.getVertexGroupMembers().add(predKey);
+                        succOpVertexGroup.getVertexGroupInfo().addInput(predKey);
+                        // Connect directly to the successor vertex group
+                        tezPlan.disconnect(pred, vertexGroupOp);
+                        tezPlan.connect(pred, succOpVertexGroup);
+                    }
+                }
+                if (succOpVertexGroup != null) {
+                    succOpVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
+                    succOpVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
+                    //Discard the new vertex group created
+                    tezPlan.remove(vertexGroupOp);
+                } else {
+                    tezPlan.connect(vertexGroupOp, succOp);
+                }
+            }
+        } catch (Exception e) {
+            throw new VisitorException(e);
+        }
+
+        List<TezOperator> succs = tezPlan.getSuccessors(unionOp);
+        // Create a copy as disconnect while iterating modifies the original list
+        List<TezOperator> successors = succs == null ? null : new ArrayList<TezOperator>(succs);
+        if (successors != null) {
+            // Successor inputs should now point to the vertex groups.
+            for (TezOperator succ : successors) {
+                LinkedList<TezInput> inputs = PlanHelper.getPhysicalOperators(succ.plan, TezInput.class);
+                for (TezInput input : inputs) {
+                    for (String key : input.getTezInputs()) {
+                        if (key.equals(unionOpKey)) {
+                            input.replaceInput(key,
+                                    newOutputKeys[unionOutputKeys.indexOf(succ.getOperatorKey().toString())]);
+                        }
+                    }
+                }
+                tezPlan.disconnect(unionOp, succ);
+            }
+        }
+
+        //Remove union operator from the plan
+        tezPlan.remove(unionOp);
+
+    }
+
+    private void copyOperatorProperties(TezOperator pred, TezOperator unionOp) {
+        pred.setUseSecondaryKey(unionOp.isUseSecondaryKey());
+        pred.UDFs.addAll(unionOp.UDFs);
+        pred.scalars.addAll(unionOp.scalars);
+    }
+
+    public static PhysicalPlan getUnionPredPlanFromSplit(PhysicalPlan plan, String unionOpKey) throws VisitorException {
+        List<POSplit> splits = PlanHelper.getPhysicalOperators(plan, POSplit.class);
+        for (POSplit split : splits) {
+            for (PhysicalPlan subPlan : split.getPlans()) {
+                if (subPlan.getLeaves().get(0) instanceof POValueOutputTez) {
+                    POValueOutputTez out = (POValueOutputTez) subPlan.getLeaves().get(0);
+                    if (out.containsOutputKey(unionOpKey)) {
+                        return subPlan;
+                    }
+                }
+            }
+        }
+        throw new VisitorException("Did not find the union predecessor in the split plan");
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Fri May 30 19:07:23 2014
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.impl.util.Pair;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+@InterfaceAudience.Private
+public class MRToTezHelper {
+
+    private static final Log LOG = LogFactory.getLog(MRToTezHelper.class);
+
+    /**
+     * Keys which are used across an edge. i.e. by an Output-Input pair.
+     */
+    private static Map<String, Pair<String, String>> mrToTezIOParamMap = new HashMap<String, Pair<String, String>>();
+
+    private static List<String> mrSettingsToRetain = new ArrayList<String>();
+
+    private MRToTezHelper() {
+    }
+
+    static {
+        populateMRToTezIOParamMap();
+        populateMRSettingsToRetain();
+    }
+
+    private static void populateMRToTezIOParamMap() {
+        mrToTezIOParamMap.put(MRJobConfig.MAP_OUTPUT_COMPRESS,
+                new Pair<String, String> (
+                        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS,
+                        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED));
+
+        mrToTezIOParamMap.put(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
+                new Pair<String, String> (
+                        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC,
+                        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC));
+
+    }
+
+    private static void populateMRSettingsToRetain() {
+
+        // FileInputFormat
+        mrSettingsToRetain.add(FileInputFormat.INPUT_DIR);
+        mrSettingsToRetain.add(FileInputFormat.SPLIT_MAXSIZE);
+        mrSettingsToRetain.add(FileInputFormat.SPLIT_MINSIZE);
+        mrSettingsToRetain.add(FileInputFormat.PATHFILTER_CLASS);
+        mrSettingsToRetain.add(FileInputFormat.NUM_INPUT_FILES);
+        mrSettingsToRetain.add(FileInputFormat.INPUT_DIR_RECURSIVE);
+
+        // FileOutputFormat
+        mrSettingsToRetain.add("mapreduce.output.basename");
+        mrSettingsToRetain.add(FileOutputFormat.COMPRESS);
+        mrSettingsToRetain.add(FileOutputFormat.COMPRESS_CODEC);
+        mrSettingsToRetain.add(FileOutputFormat.COMPRESS_TYPE);
+        mrSettingsToRetain.add(FileOutputFormat.OUTDIR);
+        mrSettingsToRetain.add(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER);
+    }
+
+    public static TezConfiguration getDAGAMConfFromMRConf(
+            TezConfiguration tezConf) {
+
+        // Set Tez parameters based on MR parameters.
+        TezConfiguration dagAMConf = new TezConfiguration(tezConf);
+        Map<String, String> mrParamToDAGParamMap = DeprecatedKeys
+                .getMRToDAGParamMap();
+
+        for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
+            if (dagAMConf.get(entry.getKey()) != null) {
+                dagAMConf.set(entry.getValue(), dagAMConf.get(entry.getKey()));
+                dagAMConf.unset(entry.getKey());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("MR->DAG Translating MR key: " + entry.getKey()
+                            + " to Tez key: " + entry.getValue()
+                            + " with value " + dagAMConf.get(entry.getValue()));
+                }
+            }
+        }
+
+        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_JAVA_OPTS,
+                org.apache.tez.mapreduce.hadoop.MRHelpers
+                        .getMRAMJavaOpts(tezConf));
+
+        String queueName = tezConf.get(JobContext.QUEUE_NAME,
+                YarnConfiguration.DEFAULT_QUEUE_NAME);
+        dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName);
+
+        int amMemMB = tezConf.getInt(MRJobConfig.MR_AM_VMEM_MB,
+                MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
+        int amCores = tezConf.getInt(MRJobConfig.MR_AM_CPU_VCORES,
+                MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
+        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, ""
+                + amMemMB);
+        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, ""
+                + amCores);
+
+        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, ""
+                + dagAMConf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
+                        MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
+
+        return dagAMConf;
+    }
+
+    /**
+     * Process the mapreduce configuration settings and
+     *    - copy as is the still required ones (like those used by FileInputFormat/FileOutputFormat)
+     *    - convert and set equivalent tez runtime settings
+     *    - handle compression related settings
+     *
+     * @param conf Configuration on which the mapreduce settings will have to be transferred
+     * @param mrConf Configuration that contains mapreduce settings
+     */
+    public static void processMRSettings(Configuration conf, Configuration mrConf) {
+        for (String mrSetting : mrSettingsToRetain) {
+            if (mrConf.get(mrSetting) != null) {
+                conf.set(mrSetting, mrConf.get(mrSetting));
+            }
+        }
+        JobControlCompiler.configureCompression(conf);
+        convertMRToTezRuntimeConf(conf, mrConf);
+    }
+
+    /**
+     * Convert MR settings to Tez settings and set on conf.
+     *
+     * @param conf  Configuration on which MR equivalent Tez settings should be set
+     * @param mrConf Configuration that contains MR settings
+     */
+    private static void convertMRToTezRuntimeConf(Configuration conf, Configuration mrConf) {
+        for (Entry<String, String> dep : DeprecatedKeys.getMRToTezRuntimeParamMap().entrySet()) {
+            if (mrConf.get(dep.getKey()) != null) {
+                conf.unset(dep.getKey());
+                LOG.info("Setting " + dep.getValue() + " to "
+                        + mrConf.get(dep.getKey()) + " from MR setting "
+                        + dep.getKey());
+                conf.setIfUnset(dep.getValue(), mrConf.get(dep.getKey()));
+            }
+        }
+
+        for (Entry<String, Pair<String, String>> dep : mrToTezIOParamMap.entrySet()) {
+            if (mrConf.get(dep.getKey()) != null) {
+                conf.unset(dep.getKey());
+                LOG.info("Setting " + dep.getValue() + " to "
+                        + mrConf.get(dep.getKey()) + " from MR setting "
+                        + dep.getKey());
+                conf.setIfUnset(dep.getValue().first, mrConf.get(dep.getKey()));
+                conf.setIfUnset(dep.getValue().second, mrConf.get(dep.getKey()));
+            }
+        }
+    }
+
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SecurityHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SecurityHelper.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SecurityHelper.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SecurityHelper.java Fri May 30 19:07:23 2014
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.pig.classification.InterfaceAudience;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.base.Charsets;
+
+/**
+ * This class duplicates some security related private methods from
+ * org.apache.hadoop.mapreduce.JobSubmitter for Tez.
+ */
+@InterfaceAudience.Private
+public class SecurityHelper {
+
+    private static Log LOG = LogFactory.getLog(SecurityHelper.class);
+
+    private SecurityHelper() {
+    }
+
+    @SuppressWarnings("unchecked")
+    private static void readTokensFromFiles(Configuration conf,
+            Credentials credentials) throws IOException {
+        // add tokens and secrets coming from a token storage file
+        String binaryTokenFilename = conf
+                .get("mapreduce.job.credentials.binary");
+        if (binaryTokenFilename != null) {
+            Credentials binary = Credentials.readTokenStorageFile(new Path(
+                    "file:///" + binaryTokenFilename), conf);
+            credentials.addAll(binary);
+        }
+        // add secret keys coming from a json file
+        String tokensFileName = conf.get("mapreduce.job.credentials.json");
+        if (tokensFileName != null) {
+            LOG.info("loading user's secret keys from " + tokensFileName);
+            String localFileName = new Path(tokensFileName).toUri().getPath();
+
+            boolean json_error = false;
+            try {
+                // read JSON
+                ObjectMapper mapper = new ObjectMapper();
+                Map<String, String> nm = mapper.readValue(new File(
+                        localFileName), Map.class);
+
+                for (Map.Entry<String, String> ent : nm.entrySet()) {
+                    credentials.addSecretKey(new Text(ent.getKey()), ent
+                            .getValue().getBytes(Charsets.UTF_8));
+                }
+            } catch (JsonMappingException e) {
+                json_error = true;
+            } catch (JsonParseException e) {
+                json_error = true;
+            }
+            if (json_error)
+                LOG.warn("couldn't parse Token Cache JSON file with user secret keys");
+        }
+    }
+
+    // get secret keys and tokens and store them into TokenCache
+    public static void populateTokenCache(Configuration conf,
+            Credentials credentials) throws IOException {
+        readTokensFromFiles(conf, credentials);
+        // add the delegation tokens from configuration
+        String[] nameNodes = conf.getStrings(MRJobConfig.JOB_NAMENODES);
+        LOG.debug("adding the following namenodes' delegation tokens:"
+                + Arrays.toString(nameNodes));
+        if (nameNodes != null) {
+            Path[] ps = new Path[nameNodes.length];
+            for (int i = 0; i < nameNodes.length; i++) {
+                ps[i] = new Path(nameNodes[i]);
+            }
+            TokenCache.obtainTokensForNamenodes(credentials, ps, conf);
+        }
+    }
+
+}