You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/05/30 21:07:29 UTC
svn commit: r1598702 [12/23] - in /pig/trunk: ./ ivy/
shims/src/hadoop23/org/apache/pig/backend/hadoop23/
shims/test/hadoop20/org/apache/pig/test/
shims/test/hadoop23/org/apache/pig/test/ src/META-INF/services/
src/org/apache/pig/ src/org/apache/pig/ba...
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Fri May 30 19:07:23 2014
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.JarManager;
+
+public class TezResourceManager {
+ private Path stagingDir;
+ private PigContext pigContext;
+ private Configuration conf;
+ private URL bootStrapJar;
+ private FileSystem remoteFs;
+ public Map<String, Path> resources = new HashMap<String, Path>();
+
+ public URL getBootStrapJar() {
+ return bootStrapJar;
+ }
+
+ public TezResourceManager(Path stagingDir, PigContext pigContext, Configuration conf) throws IOException {
+ resources.clear();
+ this.stagingDir = stagingDir;
+ this.pigContext = pigContext;
+ this.conf = conf;
+ String jar = JarManager.findContainingJar(org.apache.pig.Main.class);
+ this.bootStrapJar = new File(jar).toURI().toURL();
+ remoteFs = FileSystem.get(conf);
+ addBootStrapJar();
+ }
+
+ // Add files from the source FS as local resources. The resource name will
+ // be the same as the file name.
+ public Path addTezResource(URL url) throws IOException {
+ Path resourcePath = new Path(url.getFile());
+ String resourceName = resourcePath.getName();
+
+ if (resources.containsKey(resourceName)) {
+ return resources.get(resourceName);
+ }
+
+ // Ship the resource to the staging directory on the remote FS
+ Path remoteFsPath = remoteFs.makeQualified(new Path(stagingDir, resourceName));
+ remoteFs.copyFromLocalFile(resourcePath, remoteFsPath);
+ resources.put(resourceName, remoteFsPath);
+ return remoteFsPath;
+ }
+
+ // Add files already present in the remote FS as local resources. Allow the
+ // resource name to be different from the file name to to support resource
+ // aliasing in a CACHE statement (and to allow the same file to be aliased
+ // with multiple resource names).
+ public void addTezResource(String resourceName, Path remoteFsPath) throws IOException {
+ if (!resources.containsKey(resourceName)) {
+ resources.put(resourceName, remoteFsPath);
+ }
+ }
+
+ public Map<String, LocalResource> addTezResources(Set<URL> resources) throws Exception {
+ Set<String> resourceNames = new HashSet<String>();
+ for (URL url : resources) {
+ addTezResource(url);
+ resourceNames.add(new Path(url.getFile()).getName());
+ }
+ return getTezResources(resourceNames);
+ }
+
+ public void addBootStrapJar() throws IOException {
+ if (resources.containsKey(bootStrapJar)) {
+ return;
+ }
+
+ FileSystem remoteFs = FileSystem.get(conf);
+ File jobJar = File.createTempFile("Job", ".jar");
+ jobJar.deleteOnExit();
+ FileOutputStream fos = new FileOutputStream(jobJar);
+ JarManager.createBootStrapJar(fos, pigContext);
+
+ // Ship the job.jar to the staging directory on the remote FS
+ Path remoteJarPath = remoteFs.makeQualified(new Path(stagingDir, new Path(bootStrapJar.getFile()).getName()));
+ remoteFs.copyFromLocalFile(new Path(jobJar.getAbsolutePath()), remoteJarPath);
+ resources.put(new Path(bootStrapJar.getFile()).getName(), remoteJarPath);
+ }
+
+ public Map<String, LocalResource> getTezResources(Set<String> resourceNames) throws Exception {
+ Map<String, LocalResource> tezResources = new HashMap<String, LocalResource>();
+ for (String resourceName : resourceNames) {
+ // The resource name will be symlinked to the resource path in the
+ // container's working directory.
+ Path resourcePath = resources.get(resourceName);
+ FileStatus fstat = remoteFs.getFileStatus(resourcePath);
+
+ LocalResource tezResource = LocalResource.newInstance(
+ ConverterUtils.getYarnUrlFromPath(fstat.getPath()),
+ LocalResourceType.FILE,
+ LocalResourceVisibility.APPLICATION,
+ fstat.getLen(),
+ fstat.getModificationTime());
+
+ tezResources.put(resourceName, tezResource);
+ }
+ return tezResources;
+ }
+}
+
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Fri May 30 19:07:23 2014
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
+import org.apache.pig.impl.PigContext;
+import org.apache.tez.client.AMConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezSession;
+import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.client.TezSessionStatus;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class TezSessionManager {
+
+ private static final Log log = LogFactory.getLog(TezSessionManager.class);
+
+ static {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+
+ @Override
+ public void run() {
+ TezSessionManager.shutdown();
+ }
+ });
+ }
+
+ private TezSessionManager() {
+ }
+
+ public static class SessionInfo {
+ SessionInfo(TezSession session, Map<String, LocalResource> resources) {
+ this.session = session;
+ this.resources = resources;
+ }
+ public Map<String, LocalResource> getResources() {
+ return resources;
+ }
+ public TezSession getTezSession() {
+ return session;
+ }
+ public void setInUse(boolean inUse) {
+ this.inUse = inUse;
+ }
+ private TezSession session;
+ private Map<String, LocalResource> resources;
+ private boolean inUse = false;
+ }
+
+ private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>();
+
+ private static void waitForTezSessionReady(TezSession tezSession)
+ throws IOException, TezException {
+ while (true) {
+ TezSessionStatus status = tezSession.getSessionStatus();
+ if (status.equals(TezSessionStatus.SHUTDOWN)) {
+ //TODO: TEZ-1017 Show diagnostics message
+ //log.error("TezSession has already shutdown. Diagnostics: " + tezSession.getSessionDiagnostics());
+ throw new RuntimeException("TezSession has already shutdown");
+ }
+ if (status.equals(TezSessionStatus.READY)) {
+ return;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted while trying to check session status", e);
+ }
+ }
+ }
+
+ private static SessionInfo createSession(Configuration conf, Map<String, LocalResource> requestedAMResources, Credentials creds) throws TezException, IOException {
+ TezConfiguration tezConf = new TezConfiguration(conf);
+ AMConfiguration amConfig = getAMConfig(tezConf, requestedAMResources, creds);
+ TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConf);
+
+ String jobName = conf.get(PigContext.JOB_NAME, "pig");
+ TezClient tezClient = new TezClient(tezConf);
+ ApplicationId appId = tezClient.createApplication();
+
+ TezSession tezSession = new TezSession(jobName, appId, sessionConfig);
+ tezSession.start();
+ waitForTezSessionReady(tezSession);
+ return new SessionInfo(tezSession, requestedAMResources);
+ }
+
+ private static AMConfiguration getAMConfig(TezConfiguration tezConf, Map<String, LocalResource> resources, Credentials creds) {
+ TezConfiguration dagAMConf = MRToTezHelper.getDAGAMConfFromMRConf(tezConf);
+ Map<String, String> amEnv = new HashMap<String, String>();
+ MRHelpers.updateEnvironmentForMRAM(tezConf, amEnv);
+
+ AMConfiguration amConfig = new AMConfiguration(amEnv, resources, dagAMConf, creds);
+ return amConfig;
+ }
+
+ private static boolean validateSessionResources(SessionInfo currentSession, Map<String, LocalResource> requestedAMResources) throws TezException, IOException {
+ for (Map.Entry<String, LocalResource> entry : requestedAMResources.entrySet()) {
+ if (!currentSession.resources.entrySet().contains(entry)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ static TezSession getSession(Configuration conf, Map<String, LocalResource> requestedAMResources, Credentials creds) throws TezException, IOException {
+ synchronized (sessionPool) {
+ List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>();
+ for (SessionInfo sessionInfo : sessionPool) {
+ if (sessionInfo.session.getSessionStatus()==TezSessionStatus.SHUTDOWN) {
+ sessionsToRemove.add(sessionInfo);
+ }
+ else if (!sessionInfo.inUse && sessionInfo.session.getSessionStatus()==TezSessionStatus.READY &&
+ validateSessionResources(sessionInfo, requestedAMResources)) {
+ sessionInfo.inUse = true;
+ return sessionInfo.session;
+ }
+ }
+
+ for (SessionInfo sessionToRemove : sessionsToRemove) {
+ sessionPool.remove(sessionToRemove);
+ }
+
+ // We cannot find available AM, create new one
+ SessionInfo sessionInfo = createSession(conf, requestedAMResources, creds);
+ sessionInfo.inUse = true;
+ sessionPool.add(sessionInfo);
+ return sessionInfo.session;
+ }
+ }
+
+ static void freeSession(TezSession session) {
+ synchronized (sessionPool) {
+ for (SessionInfo sessionInfo : sessionPool) {
+ if (sessionInfo.session == session) {
+ sessionInfo.inUse = false;
+ break;
+ }
+ }
+ }
+ }
+
+ static void stopSession(TezSession session) throws TezException, IOException {
+ synchronized (sessionPool) {
+ Iterator<SessionInfo> iter = sessionPool.iterator();
+ while (iter.hasNext()) {
+ SessionInfo sessionInfo = iter.next();
+ if (sessionInfo.session == session) {
+ log.info("Shutting down Tez session " + session);
+ session.stop();
+ iter.remove();
+ break;
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public static void shutdown() {
+ synchronized (sessionPool) {
+ for (SessionInfo sessionInfo : sessionPool) {
+ try {
+ log.info("Shutting down Tez session " + sessionInfo.session);
+ sessionInfo.session.stop();
+ } catch (Exception e) {
+ log.error("Error shutting down Tez session " + sessionInfo.session, e);
+ }
+ }
+ sessionPool.clear();
+ }
+ }
+}
+
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskConfigurable.java Fri May 30 19:07:23 2014
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+/**
+ * This interface is implemented by PhysicalOperators that can need to access
+ * TezProcessorContext of a Tez task.
+ */
+
+public interface TezTaskConfigurable {
+
+ public void initialize(TezProcessorContext processorContext) throws ExecException;
+
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezTaskContext.java Fri May 30 19:07:23 2014
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.pig.backend.hadoop.executionengine.TaskContext;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.hadoop.mapred.MRCounters.MRCounter;
+import org.apache.tez.runtime.api.TezProcessorContext;
+
+public class TezTaskContext extends TaskContext<TezProcessorContext> {
+ private TezProcessorContext context;
+
+ public TezTaskContext(TezProcessorContext context) {
+ this.context = context;
+ }
+
+ @Override
+ public TezProcessorContext get() {
+ return context;
+ }
+
+ @Override
+ public Counter getCounter(Enum<?> name) {
+ if (context == null) {
+ return null;
+ }
+ TezCounter tezCounter = context.getCounters().findCounter(name);
+ return new MRCounter(tezCounter);
+ }
+
+ @Override
+ public Counter getCounter(String group, String name) {
+ if (context == null) {
+ return null;
+ }
+ TezCounter tezCounter = context.getCounters().getGroup(group).findCounter(name);
+ return new MRCounter(tezCounter);
+ }
+
+ @Override
+ public boolean incrCounter(Enum<?> name, long delta) {
+ if (context == null) {
+ return false;
+ }
+ TezCounter counter = context.getCounters().findCounter(name);
+ counter.increment(delta);
+ return true;
+ }
+
+ @Override
+ public boolean incrCounter(String group, String name, long delta) {
+ if (context == null) {
+ return false;
+ }
+ TezCounter counter = context.getCounters().getGroup(group).findCounter(name);
+ counter.increment(delta);
+ return true;
+ }
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java Fri May 30 19:07:23 2014
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalMap;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+public class WeightedRangePartitionerTez extends WeightedRangePartitioner {
+ private static final Log LOG = LogFactory.getLog(WeightedRangePartitionerTez.class);
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init() {
+ ObjectCache cache = ObjectCache.getInstance();
+ String isCachedKey = "sample-" + PigProcessor.sampleVertex + ".cached";
+ String quantilesCacheKey = "sample-" + PigProcessor.sampleVertex + ".quantiles";
+ String weightedPartsCacheKey = "sample-" + PigProcessor.sampleVertex + ".weightedParts";
+ if (cache.retrieve(isCachedKey) == Boolean.TRUE) {
+ quantiles = (PigNullableWritable[]) cache
+ .retrieve(quantilesCacheKey);
+ weightedParts = (Map<PigNullableWritable, DiscreteProbabilitySampleGenerator>) cache
+ .retrieve(weightedPartsCacheKey);
+ LOG.info("Found quantiles and weightedParts in Tez cache. cachekey="
+ + quantilesCacheKey + "," + weightedPartsCacheKey);
+ inited = true;
+ return;
+ }
+
+ Map<String, Object> quantileMap = null;
+ if (PigProcessor.sampleMap != null) {
+ // We've collected sampleMap in PigProcessor
+ quantileMap = PigProcessor.sampleMap;
+ } else {
+ LOG.info("Quantiles map is empty");
+ inited = true;
+ return;
+ }
+
+ long start = System.currentTimeMillis();
+ try {
+ DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
+ InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
+ convertToArray(quantilesList);
+ for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
+ Tuple key = (Tuple) ent.getKey(); // sample item which repeats
+ float[] probVec = getProbVec((Tuple) ent.getValue());
+ weightedParts.put(getPigNullableWritable(key),
+ new DiscreteProbabilitySampleGenerator(probVec));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ LOG.info("Initialized WeightedRangePartitionerTez. Time taken: " + (System.currentTimeMillis() - start));
+ cache.cache(isCachedKey, Boolean.TRUE);
+ cache.cache(quantilesCacheKey, quantiles);
+ cache.cache(weightedPartsCacheKey, weightedParts);
+ inited = true;
+ }
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterStatsTez.java Fri May 30 19:07:23 2014
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez.operators;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezInput;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOutput;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+
+/**
+ * POCounterStatsTez is used to group counters from previous vertex POCounterTez tasks
+ */
+public class POCounterStatsTez extends PhysicalOperator implements TezInput, TezOutput {
+
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(POCounterStatsTez.class);
+ private String inputKey;
+ private String outputKey;
+ // TODO: Even though we expect only one record from POCounter, because of Shuffle we have
+ // KeyValuesReader. After TEZ-661, switch to unsorted shuffle
+ private transient KeyValuesReader reader;
+ private transient KeyValueWriter writer;
+
+ public POCounterStatsTez(OperatorKey k) {
+ super(k);
+ }
+
+ @Override
+ public String[] getTezInputs() {
+ return new String[] { inputKey };
+ }
+
+ @Override
+ public void replaceInput(String oldInputKey, String newInputKey) {
+ if (oldInputKey.equals(inputKey)) {
+ inputKey = newInputKey;
+ }
+ }
+
+ @Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ }
+
+ @Override
+ public void attachInputs(Map<String, LogicalInput> inputs,
+ Configuration conf)
+ throws ExecException {
+ LogicalInput input = inputs.get(inputKey);
+ if (input == null) {
+ throw new ExecException("Input from vertex " + inputKey + " is missing");
+ }
+ try {
+ reader = (KeyValuesReader) input.getReader();
+ LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public String[] getTezOutputs() {
+ return new String[] { outputKey };
+ }
+
+ @Override
+ public void replaceOutput(String oldOutputKey, String newOutputKey) {
+ if (oldOutputKey.equals(outputKey)) {
+ outputKey = newOutputKey;
+ }
+ }
+
+ @Override
+ public void attachOutputs(Map<String, LogicalOutput> outputs,
+ Configuration conf) throws ExecException {
+ LogicalOutput output = outputs.get(outputKey);
+ if (output == null) {
+ throw new ExecException("Output to vertex " + outputKey + " is missing");
+ }
+ try {
+ writer = (KeyValueWriter) output.getWriter();
+ LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+ try {
+ Map<Integer, Long> counterRecords = new HashMap<Integer, Long>();
+ Integer key = null;
+ Long value = null;
+ // Read count of records per task
+ while (reader.next()) {
+ key = ((IntWritable)reader.getCurrentKey()).get();
+ for (Object val : reader.getCurrentValues()) {
+ value = ((LongWritable)val).get();
+ counterRecords.put(key, value);
+ }
+ }
+
+ // BinInterSedes only takes String for map key
+ Map<String, Long> counterOffsets = new HashMap<String, Long>();
+ // Create a map to contain task ids and beginning offset of record count
+ // based on total record count of all tasks
+ // For eg: If Task 0 has 5 records, Task 1 has 10 records and Task 2 has 3 records
+ // map will contain {0=0, 1=5, 2=15}
+ Long prevTasksCount = counterRecords.get(0);
+ counterOffsets.put("0", 0L);
+ for (int i = 1; i < counterRecords.size(); i++) {
+ counterOffsets.put("" + i, prevTasksCount);
+ prevTasksCount += counterRecords.get(i);
+ }
+
+ Tuple tuple = TupleFactory.getInstance().newTuple(1);
+ tuple.set(0, counterOffsets);
+ writer.write(POValueOutputTez.EMPTY_KEY, tuple);
+ return RESULT_EOP;
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
+ }
+
+ public void setInputKey(String inputKey) {
+ this.inputKey = inputKey;
+ }
+
+ public void setOutputKey(String outputKey) {
+ this.outputKey = outputKey;
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visit(this);
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String name() {
+ return "PORankStatsTez - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey;
+ }
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POCounterTez.java Fri May 30 19:07:23 2014
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.operators;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOutput;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezTaskConfigurable;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.library.api.KeyValueWriter;
+
+public class POCounterTez extends POCounter implements TezOutput, TezTaskConfigurable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(POCounterTez.class);
+
+ private String tuplesOutputKey;
+ private String statsOutputKey;
+
+ private transient KeyValueWriter tuplesWriter;
+ private transient KeyValueWriter statsWriter;
+ private transient long totalTaskRecords = 0;
+
+ public POCounterTez(POCounter copy) {
+ super(copy);
+ }
+
+ @Override
+ public void initialize(TezProcessorContext processorContext)
+ throws ExecException {
+ this.setTaskId(processorContext.getTaskIndex());
+ }
+
+ public void setTuplesOutputKey(String tuplesOutputKey) {
+ this.tuplesOutputKey = tuplesOutputKey;
+ }
+
+ public void setStatsOutputKey(String statsOutputKey) {
+ this.statsOutputKey = statsOutputKey;
+ }
+
+ @Override
+ public String[] getTezOutputs() {
+ return new String[] { tuplesOutputKey, statsOutputKey };
+ }
+
+ @Override
+ public void replaceOutput(String oldOutputKey, String newOutputKey) {
+ if (oldOutputKey.equals(tuplesOutputKey)) {
+ tuplesOutputKey = newOutputKey;
+ } else if (oldOutputKey.equals(statsOutputKey)) {
+ statsOutputKey = newOutputKey;
+ }
+ }
+
+ @Override
+ public void attachOutputs(Map<String, LogicalOutput> outputs,
+ Configuration conf) throws ExecException {
+ LogicalOutput output = outputs.get(tuplesOutputKey);
+ if (output == null) {
+ throw new ExecException("Output to vertex " + tuplesOutputKey + " is missing");
+ }
+ try {
+ tuplesWriter = (KeyValueWriter) output.getWriter();
+ LOG.info("Attached output to vertex " + tuplesOutputKey + " : output=" + output + ", writer=" + tuplesWriter);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+
+ output = outputs.get(statsOutputKey);
+ if (output == null) {
+ throw new ExecException("Output to vertex " + statsOutputKey + " is missing");
+ }
+ try {
+ statsWriter = (KeyValueWriter) output.getWriter();
+ LOG.info("Attached output to vertex " + statsOutputKey + " : output=" + output + ", writer=" + statsWriter);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+ Result inp = null;
+ try {
+ while (true) {
+ inp = processInput();
+ if (inp.returnStatus == POStatus.STATUS_EOP
+ || inp.returnStatus == POStatus.STATUS_ERR)
+ break;
+ if (inp.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ }
+
+ tuplesWriter.write(POValueOutputTez.EMPTY_KEY,
+ addCounterValue(inp).result);
+ }
+
+ statsWriter.write(new IntWritable(this.getTaskId()), new LongWritable(totalTaskRecords));
+
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
+ return RESULT_EOP;
+ }
+
+ @Override
+ protected Long incrementLocalCounter() {
+ totalTaskRecords++;
+ return super.incrementLocalCounter();
+ }
+
+ @Override
+ protected void addToLocalCounter(Long sizeBag) {
+ super.addToLocalCounter(sizeBag);
+ totalTaskRecords += sizeBag;
+ }
+
+ @Override
+ protected void incrementReduceCounter(Long increment) {
+ totalTaskRecords += increment;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ super.visit(v);
+ v.visit(this);
+ }
+
+ @Override
+ public String name() {
+ return "POCounterTez - " + mKey.toString() + "\t->\t " + tuplesOutputKey + "," + statsOutputKey;
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/PORankTez.java Fri May 30 19:07:23 2014
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.operators;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.tez.ObjectCache;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezInput;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+public class PORankTez extends PORank implements TezInput {
+
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(PORankTez.class);
+
+ private String tuplesInputKey;
+ private String statsInputKey;
+ private transient boolean isInputCached;
+ private transient KeyValueReader reader;
+ private transient Map<Integer, Long> counterOffsets;
+ private transient Configuration conf;
+
+ public PORankTez(PORank copy) {
+ super(copy);
+ }
+
+ public void setTuplesInputKey(String tuplesInputKey) {
+ this.tuplesInputKey = tuplesInputKey;
+ }
+
+ public void setStatsInputKey(String statsInputKey) {
+ this.statsInputKey = statsInputKey;
+ }
+
+ @Override
+ public String[] getTezInputs() {
+ return new String[] { tuplesInputKey, statsInputKey };
+ }
+
+ @Override
+ public void replaceInput(String oldInputKey, String newInputKey) {
+ if (oldInputKey.equals(tuplesInputKey)) {
+ tuplesInputKey = newInputKey;
+ } else if (oldInputKey.equals(statsInputKey)) {
+ statsInputKey = newInputKey;
+ }
+ }
+
+ @Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ String cacheKey = "rankstats-" + getOperatorKey().toString();
+ Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+ if (cacheValue != null) {
+ isInputCached = true;
+ inputsToSkip.add(statsInputKey);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void attachInputs(Map<String, LogicalInput> inputs,
+ Configuration conf) throws ExecException {
+ this.conf = conf;
+ LogicalInput input = inputs.get(tuplesInputKey);
+ if (input == null) {
+ throw new ExecException("Input from vertex " + tuplesInputKey + " is missing");
+ }
+ try {
+ reader = (KeyValueReader) input.getReader();
+ LOG.info("Attached input from vertex " + tuplesInputKey + " : input=" + input + ", reader=" + reader);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+
+ String cacheKey = "rankstats-" + getOperatorKey().toString();
+ if (isInputCached) {
+ counterOffsets = (Map<Integer, Long>) ObjectCache.getInstance().retrieve(cacheKey);
+ LOG.info("Found counter stats for PORankTez in Tez cache. cachekey=" + cacheKey);
+ return;
+ }
+ input = inputs.get(statsInputKey);
+ if (input == null) {
+ throw new ExecException("Input from vertex " + statsInputKey + " is missing");
+ }
+ try {
+ KeyValueReader reader = (KeyValueReader) input.getReader();
+ LOG.info("Attached input from vertex " + statsInputKey + " : input=" + input + ", reader=" + reader);
+ reader.next();
+ // POCounterStatsTez produces a HashMap which contains
+ // mapping of task id and the offset of record count in each task based on total record count
+ Map<String, Long> counterOffsetsTemp = (Map<String, Long>) ((Tuple)reader.getCurrentValue()).get(0);
+ counterOffsets = new HashMap<Integer, Long>(counterOffsetsTemp.size(), 1);
+ for (Entry<String, Long> entry : counterOffsetsTemp.entrySet()) {
+ counterOffsets.put(Integer.valueOf(entry.getKey()), entry.getValue());
+ }
+ ObjectCache.getInstance().cache(cacheKey, counterOffsets);
+ LOG.info("Cached PORankTez counter stats in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey);
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+ Result inp = null;
+
+ try {
+ while (reader.next()) {
+ inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
+ return addRank(inp);
+ }
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
+
+ // For certain operators (such as STREAM), we could still have some work
+ // to do even after seeing the last input. These operators set a flag that
+ // says all input has been sent and to run the pipeline one more time.
+ if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
+ this.parentPlan.endOfAllInput = true;
+ }
+ return RESULT_EOP;
+ }
+
+ @Override
+ protected Long getRankCounterOffset(Integer taskId) {
+ if (illustrator != null) {
+ return 0L;
+ }
+ return counterOffsets.get(taskId);
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ super.visit(v);
+ v.visit(this);
+ }
+
+ @Override
+ public String name() {
+ return "PORankTez - " + mKey.toString() + "\t<-\t " + tuplesInputKey + "," + statsInputKey;
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POShuffledValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POShuffledValueInputTez.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POShuffledValueInputTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/operators/POShuffledValueInputTez.java Fri May 30 19:07:23 2014
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez.operators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezInput;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * POShuffledValueInputTez is used read tuples from a Tez Intermediate output from a shuffle edge
+ * To be used with POValueOutputTez.
+ * TODO: To be removed after PIG-3775 and TEZ-661
+ */
+public class POShuffledValueInputTez extends PhysicalOperator implements TezInput {
+
+ private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(POShuffledValueInputTez.class);
+ private Set<String> inputKeys = new HashSet<String>();
+ private transient boolean finished = false;
+ private transient Iterator<KeyValueReader> readers;
+ private transient KeyValueReader currentReader;
+ protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+ private transient Configuration conf;
+
+ public POShuffledValueInputTez(OperatorKey k) {
+ super(k);
+ }
+
+ @Override
+ public String[] getTezInputs() {
+ return inputKeys.toArray(new String[inputKeys.size()]);
+ }
+
+ @Override
+ public void replaceInput(String oldInputKey, String newInputKey) {
+ if (inputKeys.remove(oldInputKey)) {
+ inputKeys.add(newInputKey);
+ }
+ }
+
+ @Override
+ public void addInputsToSkip(Set<String> inputsToSkip) {
+ }
+
+ @Override
+ public void attachInputs(Map<String, LogicalInput> inputs,
+ Configuration conf) throws ExecException {
+ this.conf = conf;
+ List<KeyValueReader> readersList = new ArrayList<KeyValueReader>();
+ try {
+ for (String inputKey : inputKeys) {
+ LogicalInput input = inputs.get(inputKey);
+ if (input == null) {
+ throw new ExecException("Input from vertex " + inputKey
+ + " is missing");
+ }
+
+ KeyValueReader reader = (KeyValueReader) input.getReader();
+ readersList.add(reader);
+ LOG.info("Attached input from vertex " + inputKey + " : input="
+ + input + ", reader=" + reader);
+ }
+ readers = readersList.iterator();
+ currentReader = readers.next();
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+ @Override
+ public Result getNextTuple() throws ExecException {
+ try {
+ if (finished) {
+ return RESULT_EOP;
+ }
+
+ do {
+ if (currentReader.next()) {
+ Tuple origTuple = (Tuple) currentReader.getCurrentValue();
+ Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+ return new Result(POStatus.STATUS_OK, copy);
+ }
+ currentReader = readers.hasNext() ? readers.next() : null;
+ } while (currentReader != null);
+
+ finished = true;
+ // For certain operators (such as STREAM), we could still have some work
+ // to do even after seeing the last input. These operators set a flag that
+ // says all input has been sent and to run the pipeline one more time.
+ if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) {
+ this.parentPlan.endOfAllInput = true;
+ }
+ return RESULT_EOP;
+ } catch (IOException e) {
+ throw new ExecException(e);
+ }
+ }
+
+ public void addInputKey(String inputKey) {
+ this.inputKeys.add(inputKey);
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visit(this);
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ @Override
+ public String name() {
+ return "POShuffledValueInputTez - " + mKey.toString() + "\t<-\t " + inputKeys;
+ }
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/NoopFilterRemover.java Fri May 30 19:07:23 2014
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.optimizers;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * For historical reasons splits will always produce filters that pass
+ * everything through unchanged. This optimizer removes these.
+ *
+ * The condition we look for is POFilters with a constant boolean
+ * (true) expression as it's plan.
+ */
+public class NoopFilterRemover extends TezOpPlanVisitor {
+
+ private static Log LOG = LogFactory.getLog(NoopFilterRemover.class);
+
+ public NoopFilterRemover(TezOperPlan plan) {
+ super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ }
+
+ @Override
+ public void visitTezOp(TezOperator tezOp) throws VisitorException {
+ if (tezOp.getSplitParent() == null) {
+ return;
+ }
+ try {
+ List<POFilter> filters = PlanHelper.getPhysicalOperators(tezOp.plan, POFilter.class);
+ for (POFilter filter : filters) {
+ PhysicalPlan filterPlan = filter.getPlan();
+ if (filterPlan.size() == 1) {
+ PhysicalOperator fp = filterPlan.getRoots().get(0);
+ if (fp instanceof ConstantExpression) {
+ ConstantExpression exp = (ConstantExpression)fp;
+ Object value = exp.getValue();
+ if (value instanceof Boolean) {
+ Boolean filterValue = (Boolean)value;
+ if (filterValue) {
+ tezOp.plan.removeAndReconnect(filter);
+ }
+ }
+ }
+ }
+ }
+ } catch (PlanException e) {
+ throw new VisitorException(e);
+ }
+ }
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java Fri May 30 19:07:23 2014
@@ -0,0 +1,293 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.optimizers;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.POStoreTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.POValueOutputTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.RoundRobinPartitioner;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezEdgeDescriptor;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezInput;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator.VertexGroupInfo;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOutput;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+
+/**
+ * Optimizes union by removing the intermediate union vertex and making the
+ * successor get input from the predecessor vertices directly using VertexGroup.
+ * This should be run after MultiQueryOptimizer so that it handles cases like
+ * union followed by split and then store.
+ *
+ * For eg:
+ * 1) Union followed by store
+ * Vertex 1 (Load), Vertex 2 (Load) -> Vertex 3 (Union + Store) will be optimized to
+ * Vertex 1 (Load + Store), Vertex 2 (Load + Store). Both the vertices will be writing output
+ * to same store location directly which is supported by Tez.
+ * 2) Union followed by groupby
+ * Vertex 1 (Load), Vertex 2 (Load) -> Vertex 3 (Union + POLocalRearrange) -> Vertex 4 (Group by)
+ * will be optimized to Vertex 1 (Load + POLR), Vertex 2 (Load + POLR) -> Vertex 4 (Group by)
+ *
+ */
+public class UnionOptimizer extends TezOpPlanVisitor {
+
+ public UnionOptimizer(TezOperPlan plan) {
+ super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ }
+
+ @Override
+ public void visitTezOp(TezOperator tezOp) throws VisitorException {
+ if (!tezOp.isUnion()) {
+ return;
+ }
+
+ TezOperator unionOp = tezOp;
+ String unionOpKey = unionOp.getOperatorKey().toString();
+ String scope = unionOp.getOperatorKey().scope;
+ TezOperPlan tezPlan = getPlan();
+
+ //TODO: PIG-3856 Handle replicated join. Replicate join input that was broadcast to union vertex
+ // now needs to be broadcast to all the union predecessors. How do we do that??
+ // Wait for shared edge and do it or write multiple times??
+ // For now don't optimize
+ // Create a copy as disconnect while iterating modifies the original list
+ List<TezOperator> predecessors = new ArrayList<TezOperator>(tezPlan.getPredecessors(unionOp));
+ if (predecessors.size() > unionOp.getVertexGroupMembers().size()) {
+ return;
+ }
+
+ PhysicalPlan unionOpPlan = unionOp.plan;
+
+ // Union followed by Split followed by Store could have multiple stores
+ List<POStoreTez> unionStoreOutputs = PlanHelper.getPhysicalOperators(unionOpPlan, POStoreTez.class);
+ TezOperator[] storeVertexGroupOps = new TezOperator[unionStoreOutputs.size()];
+ for (int i=0; i < storeVertexGroupOps.length; i++) {
+ storeVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
+ storeVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo(unionStoreOutputs.get(i)));
+ storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
+ tezPlan.add(storeVertexGroupOps[i]);
+ }
+
+ // Case of split, orderby, skewed join, rank, etc will have multiple outputs
+ List<TezOutput> unionOutputs = PlanHelper.getPhysicalOperators(unionOpPlan, TezOutput.class);
+ // One TezOutput can write to multiple LogicalOutputs (POCounterTez, POValueOutputTez, etc)
+ List<String> unionOutputKeys = new ArrayList<String>();
+ for (TezOutput output : unionOutputs) {
+ if (output instanceof POStoreTez) {
+ continue;
+ }
+ for (String key : output.getTezOutputs()) {
+ unionOutputKeys.add(key);
+ }
+ }
+
+ // Create vertex group operator for each output
+ TezOperator[] outputVertexGroupOps = new TezOperator[unionOutputKeys.size()];
+ String[] newOutputKeys = new String[unionOutputKeys.size()];
+ for (int i=0; i < outputVertexGroupOps.length; i++) {
+ outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
+ outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
+ outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
+ outputVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
+ newOutputKeys[i] = outputVertexGroupOps[i].getOperatorKey().toString();
+ tezPlan.add(outputVertexGroupOps[i]);
+ }
+
+ try {
+
+ // Clone plan of union and merge it into the predecessor operators
+ // Remove POShuffledValueInputTez from union plan root
+ unionOpPlan.remove(unionOpPlan.getRoots().get(0));
+ for (OperatorKey predKey : unionOp.getVertexGroupMembers()) {
+ TezOperator pred = tezPlan.getOperator(predKey);
+ PhysicalPlan predPlan = pred.plan;
+ PhysicalOperator predLeaf = predPlan.getLeaves().get(0);
+ // if predLeaf not POValueOutputTez
+ if (predLeaf instanceof POSplit) {
+ // Find the subPlan that connects to the union operator
+ predPlan = getUnionPredPlanFromSplit(predPlan, unionOpKey);
+ predLeaf = predPlan.getLeaves().get(0);
+ }
+
+ PhysicalPlan clonePlan = unionOpPlan.clone();
+ //Clone changes the operator keys
+ List<POStoreTez> clonedUnionStoreOutputs = PlanHelper.getPhysicalOperators(clonePlan, POStoreTez.class);
+
+ // Remove POValueOutputTez from predecessor leaf
+ predPlan.remove(predLeaf);
+ boolean isEmptyPlan = predPlan.isEmpty();
+ if (!isEmptyPlan) {
+ predLeaf = predPlan.getLeaves().get(0);
+ }
+ predPlan.merge(clonePlan);
+ if (!isEmptyPlan) {
+ predPlan.connect(predLeaf, clonePlan.getRoots().get(0));
+ }
+
+ // Connect predecessor to the storeVertexGroups
+ int i = 0;
+ for (TezOperator storeVertexGroup : storeVertexGroupOps) {
+ storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
+ //Set the output key of cloned POStore to that of the initial union POStore.
+ clonedUnionStoreOutputs.get(i).setOutputKey(
+ storeVertexGroup.getVertexGroupInfo().getStore()
+ .getOperatorKey().toString());
+ pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(),
+ storeVertexGroup.getOperatorKey());
+ tezPlan.connect(pred, storeVertexGroup);
+ }
+
+ for (TezOperator outputVertexGroup : outputVertexGroupOps) {
+ outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
+ tezPlan.connect(pred, outputVertexGroup);
+ }
+
+ copyOperatorProperties(pred, unionOp);
+ tezPlan.disconnect(pred, unionOp);
+ }
+
+ List<TezOperator> successors = tezPlan.getSuccessors(unionOp);
+ List<TezOutput> valueOnlyOutputs = new ArrayList<TezOutput>();
+ for (TezOutput tezOutput : unionOutputs) {
+ if (tezOutput instanceof POValueOutputTez) {
+ valueOnlyOutputs.add(tezOutput);
+ }
+ }
+ // Connect to outputVertexGroupOps
+ // Copy output edges of union -> successor to predecessor->successor, vertexgroup -> successor
+ // and connect vertexgroup -> successor in the plan.
+ for (Entry<OperatorKey, TezEdgeDescriptor> entry : unionOp.outEdges.entrySet()) {
+ TezOperator succOp = tezPlan.getOperator(entry.getKey());
+ // Case of union followed by union.
+ // unionOp.outEdges will not point to vertex group, but to its output.
+ // So find the vertex group if there is one.
+ TezOperator succOpVertexGroup = null;
+ for (TezOperator succ : successors) {
+ if (succ.isVertexGroup()
+ && succ.getVertexGroupInfo().getOutput()
+ .equals(succOp.getOperatorKey().toString())) {
+ succOpVertexGroup = succ;
+ break;
+ }
+ }
+ TezEdgeDescriptor edge = entry.getValue();
+ // Edge cannot be one to one as it will get input from two or
+ // more union predecessors. Change it to SCATTER_GATHER
+ if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
+ edge.dataMovementType = DataMovementType.SCATTER_GATHER;
+ edge.partitionerClass = RoundRobinPartitioner.class;
+ edge.outputClassName = OnFileSortedOutput.class.getName();
+ edge.inputClassName = ShuffledMergedInput.class.getName();
+
+ for (TezOutput tezOutput : valueOnlyOutputs) {
+ if (ArrayUtils.contains(tezOutput.getTezOutputs(), entry.getKey().toString())) {
+ edge.setIntermediateOutputKeyComparatorClass(
+ POValueOutputTez.EmptyWritableComparator.class.getName());
+ }
+ }
+ }
+ TezOperator vertexGroupOp = outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
+ for (OperatorKey predKey : vertexGroupOp.getVertexGroupMembers()) {
+ TezOperator pred = tezPlan.getOperator(predKey);
+ // Keep the output edge directly to successor
+ // Don't need to keep output edge for vertexgroup
+ pred.outEdges.put(entry.getKey(), edge);
+ succOp.inEdges.put(predKey, edge);
+ if (succOpVertexGroup != null) {
+ succOpVertexGroup.getVertexGroupMembers().add(predKey);
+ succOpVertexGroup.getVertexGroupInfo().addInput(predKey);
+ // Connect directly to the successor vertex group
+ tezPlan.disconnect(pred, vertexGroupOp);
+ tezPlan.connect(pred, succOpVertexGroup);
+ }
+ }
+ if (succOpVertexGroup != null) {
+ succOpVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
+ succOpVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
+ //Discard the new vertex group created
+ tezPlan.remove(vertexGroupOp);
+ } else {
+ tezPlan.connect(vertexGroupOp, succOp);
+ }
+ }
+ } catch (Exception e) {
+ throw new VisitorException(e);
+ }
+
+ List<TezOperator> succs = tezPlan.getSuccessors(unionOp);
+ // Create a copy as disconnect while iterating modifies the original list
+ List<TezOperator> successors = succs == null ? null : new ArrayList<TezOperator>(succs);
+ if (successors != null) {
+ // Successor inputs should now point to the vertex groups.
+ for (TezOperator succ : successors) {
+ LinkedList<TezInput> inputs = PlanHelper.getPhysicalOperators(succ.plan, TezInput.class);
+ for (TezInput input : inputs) {
+ for (String key : input.getTezInputs()) {
+ if (key.equals(unionOpKey)) {
+ input.replaceInput(key,
+ newOutputKeys[unionOutputKeys.indexOf(succ.getOperatorKey().toString())]);
+ }
+ }
+ }
+ tezPlan.disconnect(unionOp, succ);
+ }
+ }
+
+ //Remove union operator from the plan
+ tezPlan.remove(unionOp);
+
+ }
+
+ private void copyOperatorProperties(TezOperator pred, TezOperator unionOp) {
+ pred.setUseSecondaryKey(unionOp.isUseSecondaryKey());
+ pred.UDFs.addAll(unionOp.UDFs);
+ pred.scalars.addAll(unionOp.scalars);
+ }
+
+ public static PhysicalPlan getUnionPredPlanFromSplit(PhysicalPlan plan, String unionOpKey) throws VisitorException {
+ List<POSplit> splits = PlanHelper.getPhysicalOperators(plan, POSplit.class);
+ for (POSplit split : splits) {
+ for (PhysicalPlan subPlan : split.getPlans()) {
+ if (subPlan.getLeaves().get(0) instanceof POValueOutputTez) {
+ POValueOutputTez out = (POValueOutputTez) subPlan.getLeaves().get(0);
+ if (out.containsOutputKey(unionOpKey)) {
+ return subPlan;
+ }
+ }
+ }
+ }
+ throw new VisitorException("Did not find the union predecessor in the split plan");
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Fri May 30 19:07:23 2014
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.util;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.impl.util.Pair;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+@InterfaceAudience.Private
+public class MRToTezHelper {
+
+ private static final Log LOG = LogFactory.getLog(MRToTezHelper.class);
+
+ /**
+ * Keys which are used across an edge. i.e. by an Output-Input pair.
+ */
+ private static Map<String, Pair<String, String>> mrToTezIOParamMap = new HashMap<String, Pair<String, String>>();
+
+ private static List<String> mrSettingsToRetain = new ArrayList<String>();
+
+ private MRToTezHelper() {
+ }
+
+ static {
+ populateMRToTezIOParamMap();
+ populateMRSettingsToRetain();
+ }
+
+ private static void populateMRToTezIOParamMap() {
+ mrToTezIOParamMap.put(MRJobConfig.MAP_OUTPUT_COMPRESS,
+ new Pair<String, String> (
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS,
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED));
+
+ mrToTezIOParamMap.put(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
+ new Pair<String, String> (
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC,
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC));
+
+ }
+
+ private static void populateMRSettingsToRetain() {
+
+ // FileInputFormat
+ mrSettingsToRetain.add(FileInputFormat.INPUT_DIR);
+ mrSettingsToRetain.add(FileInputFormat.SPLIT_MAXSIZE);
+ mrSettingsToRetain.add(FileInputFormat.SPLIT_MINSIZE);
+ mrSettingsToRetain.add(FileInputFormat.PATHFILTER_CLASS);
+ mrSettingsToRetain.add(FileInputFormat.NUM_INPUT_FILES);
+ mrSettingsToRetain.add(FileInputFormat.INPUT_DIR_RECURSIVE);
+
+ // FileOutputFormat
+ mrSettingsToRetain.add("mapreduce.output.basename");
+ mrSettingsToRetain.add(FileOutputFormat.COMPRESS);
+ mrSettingsToRetain.add(FileOutputFormat.COMPRESS_CODEC);
+ mrSettingsToRetain.add(FileOutputFormat.COMPRESS_TYPE);
+ mrSettingsToRetain.add(FileOutputFormat.OUTDIR);
+ mrSettingsToRetain.add(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER);
+ }
+
+ public static TezConfiguration getDAGAMConfFromMRConf(
+ TezConfiguration tezConf) {
+
+ // Set Tez parameters based on MR parameters.
+ TezConfiguration dagAMConf = new TezConfiguration(tezConf);
+ Map<String, String> mrParamToDAGParamMap = DeprecatedKeys
+ .getMRToDAGParamMap();
+
+ for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
+ if (dagAMConf.get(entry.getKey()) != null) {
+ dagAMConf.set(entry.getValue(), dagAMConf.get(entry.getKey()));
+ dagAMConf.unset(entry.getKey());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MR->DAG Translating MR key: " + entry.getKey()
+ + " to Tez key: " + entry.getValue()
+ + " with value " + dagAMConf.get(entry.getValue()));
+ }
+ }
+ }
+
+ dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_JAVA_OPTS,
+ org.apache.tez.mapreduce.hadoop.MRHelpers
+ .getMRAMJavaOpts(tezConf));
+
+ String queueName = tezConf.get(JobContext.QUEUE_NAME,
+ YarnConfiguration.DEFAULT_QUEUE_NAME);
+ dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName);
+
+ int amMemMB = tezConf.getInt(MRJobConfig.MR_AM_VMEM_MB,
+ MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
+ int amCores = tezConf.getInt(MRJobConfig.MR_AM_CPU_VCORES,
+ MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
+ dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, ""
+ + amMemMB);
+ dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, ""
+ + amCores);
+
+ dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, ""
+ + dagAMConf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
+ MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
+
+ return dagAMConf;
+ }
+
+ /**
+ * Process the mapreduce configuration settings and
+ * - copy as is the still required ones (like those used by FileInputFormat/FileOutputFormat)
+ * - convert and set equivalent tez runtime settings
+ * - handle compression related settings
+ *
+ * @param conf Configuration on which the mapreduce settings will have to be transferred
+ * @param mrConf Configuration that contains mapreduce settings
+ */
+ public static void processMRSettings(Configuration conf, Configuration mrConf) {
+ for (String mrSetting : mrSettingsToRetain) {
+ if (mrConf.get(mrSetting) != null) {
+ conf.set(mrSetting, mrConf.get(mrSetting));
+ }
+ }
+ JobControlCompiler.configureCompression(conf);
+ convertMRToTezRuntimeConf(conf, mrConf);
+ }
+
+ /**
+ * Convert MR settings to Tez settings and set on conf.
+ *
+ * @param conf Configuration on which MR equivalent Tez settings should be set
+ * @param mrConf Configuration that contains MR settings
+ */
+ private static void convertMRToTezRuntimeConf(Configuration conf, Configuration mrConf) {
+ for (Entry<String, String> dep : DeprecatedKeys.getMRToTezRuntimeParamMap().entrySet()) {
+ if (mrConf.get(dep.getKey()) != null) {
+ conf.unset(dep.getKey());
+ LOG.info("Setting " + dep.getValue() + " to "
+ + mrConf.get(dep.getKey()) + " from MR setting "
+ + dep.getKey());
+ conf.setIfUnset(dep.getValue(), mrConf.get(dep.getKey()));
+ }
+ }
+
+ for (Entry<String, Pair<String, String>> dep : mrToTezIOParamMap.entrySet()) {
+ if (mrConf.get(dep.getKey()) != null) {
+ conf.unset(dep.getKey());
+ LOG.info("Setting " + dep.getValue() + " to "
+ + mrConf.get(dep.getKey()) + " from MR setting "
+ + dep.getKey());
+ conf.setIfUnset(dep.getValue().first, mrConf.get(dep.getKey()));
+ conf.setIfUnset(dep.getValue().second, mrConf.get(dep.getKey()));
+ }
+ }
+ }
+
+}
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SecurityHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SecurityHelper.java?rev=1598702&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SecurityHelper.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SecurityHelper.java Fri May 30 19:07:23 2014
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.pig.classification.InterfaceAudience;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.base.Charsets;
+
+/**
+ * This class duplicates some security related private methods from
+ * org.apache.hadoop.mapreduce.JobSubmitter for Tez.
+ */
+@InterfaceAudience.Private
+public class SecurityHelper {
+
+ private static Log LOG = LogFactory.getLog(SecurityHelper.class);
+
+ private SecurityHelper() {
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void readTokensFromFiles(Configuration conf,
+ Credentials credentials) throws IOException {
+ // add tokens and secrets coming from a token storage file
+ String binaryTokenFilename = conf
+ .get("mapreduce.job.credentials.binary");
+ if (binaryTokenFilename != null) {
+ Credentials binary = Credentials.readTokenStorageFile(new Path(
+ "file:///" + binaryTokenFilename), conf);
+ credentials.addAll(binary);
+ }
+ // add secret keys coming from a json file
+ String tokensFileName = conf.get("mapreduce.job.credentials.json");
+ if (tokensFileName != null) {
+ LOG.info("loading user's secret keys from " + tokensFileName);
+ String localFileName = new Path(tokensFileName).toUri().getPath();
+
+ boolean json_error = false;
+ try {
+ // read JSON
+ ObjectMapper mapper = new ObjectMapper();
+ Map<String, String> nm = mapper.readValue(new File(
+ localFileName), Map.class);
+
+ for (Map.Entry<String, String> ent : nm.entrySet()) {
+ credentials.addSecretKey(new Text(ent.getKey()), ent
+ .getValue().getBytes(Charsets.UTF_8));
+ }
+ } catch (JsonMappingException e) {
+ json_error = true;
+ } catch (JsonParseException e) {
+ json_error = true;
+ }
+ if (json_error)
+ LOG.warn("couldn't parse Token Cache JSON file with user secret keys");
+ }
+ }
+
+ // get secret keys and tokens and store them into TokenCache
+ public static void populateTokenCache(Configuration conf,
+ Credentials credentials) throws IOException {
+ readTokensFromFiles(conf, credentials);
+ // add the delegation tokens from configuration
+ String[] nameNodes = conf.getStrings(MRJobConfig.JOB_NAMENODES);
+ LOG.debug("adding the following namenodes' delegation tokens:"
+ + Arrays.toString(nameNodes));
+ if (nameNodes != null) {
+ Path[] ps = new Path[nameNodes.length];
+ for (int i = 0; i < nameNodes.length; i++) {
+ ps[i] = new Path(nameNodes[i]);
+ }
+ TokenCache.obtainTokensForNamenodes(credentials, ps, conf);
+ }
+ }
+
+}