You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:44 UTC
[37/50] [abbrv] Rename tez-engine-api to tez-runtime-api and
tez-engine is split into 2: - tez-engine-library for user-visible
Input/Output/Processor implementations - tez-engine-internals for framework
internals
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/Fetcher.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/Fetcher.java
deleted file mode 100644
index a353416..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/Fetcher.java
+++ /dev/null
@@ -1,608 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.shuffle.common;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLConnection;
-import java.security.GeneralSecurityException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.crypto.SecretKey;
-import javax.net.ssl.HttpsURLConnection;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.security.SecureShuffleUtils;
-import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
-import org.apache.tez.engine.common.sort.impl.IFileInputStream;
-import org.apache.tez.engine.shuffle.common.FetchedInput.Type;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Responsible for fetching inputs served by the ShuffleHandler for a single
- * host. Construct using {@link FetcherBuilder}
- */
-public class Fetcher implements Callable<FetchResult> {
-
- private static final Log LOG = LogFactory.getLog(Fetcher.class);
-
- private static final int UNIT_CONNECT_TIMEOUT = 60 * 1000;
- private static final AtomicInteger fetcherIdGen = new AtomicInteger(0);
-
- // Configurable fields.
- private CompressionCodec codec;
- private Decompressor decompressor;
- private int connectionTimeout;
- private int readTimeout;
-
- private final SecretKey shuffleSecret;
- private final Configuration conf;
-
- private final FetcherCallback fetcherCallback;
- private final FetchedInputAllocator inputManager;
- private final ApplicationId appId;
-
- private static boolean sslShuffle;
- private static SSLFactory sslFactory;
- private static boolean sslFactoryInited;
-
- private final int fetcherIdentifier;
-
- // Parameters to track work.
- private List<InputAttemptIdentifier> srcAttempts;
- private String host;
- private int port;
- private int partition;
-
- // Maps from the pathComponents (unique per srcTaskId) to the specific taskId
- private Map<String, InputAttemptIdentifier> pathToAttemptMap;
- private Set<InputAttemptIdentifier> remaining;
-
- private URL url;
- private String encHash;
- private String msgToEncode;
-
- private Fetcher(FetcherCallback fetcherCallback,
- FetchedInputAllocator inputManager, ApplicationId appId, SecretKey shuffleSecret,
- Configuration conf) {
- this.fetcherCallback = fetcherCallback;
- this.inputManager = inputManager;
- this.shuffleSecret = shuffleSecret;
- this.appId = appId;
- this.conf = conf;
-
- this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
-
- // TODO NEWTEZ Ideally, move this out from here into a static initializer block.
- synchronized (Fetcher.class) {
- if (!sslFactoryInited) {
- sslFactoryInited = true;
- sslShuffle = conf.getBoolean(
- TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL,
- TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_ENABLE_SSL);
- if (sslShuffle) {
- sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
- try {
- sslFactory.init();
- } catch (Exception ex) {
- sslFactory.destroy();
- throw new RuntimeException(ex);
- }
- }
- }
- }
- }
-
- @Override
- public FetchResult call() throws Exception {
- if (srcAttempts.size() == 0) {
- return new FetchResult(host, port, partition, srcAttempts);
- }
-
- for (InputAttemptIdentifier in : srcAttempts) {
- pathToAttemptMap.put(in.getPathComponent(), in);
- }
-
- remaining = new HashSet<InputAttemptIdentifier>(srcAttempts);
-
- HttpURLConnection connection;
- try {
- connection = connectToShuffleHandler(host, port, partition, srcAttempts);
- } catch (IOException e) {
- // ioErrs.increment(1);
- // If connect did not succeed, just mark all the maps as failed,
- // indirectly penalizing the host
- for (Iterator<InputAttemptIdentifier> leftIter = remaining.iterator(); leftIter
- .hasNext();) {
- fetcherCallback.fetchFailed(host, leftIter.next(), true);
- leftIter.remove();
- }
- return new FetchResult(host, port, partition, remaining);
- }
-
- DataInputStream input;
-
- try {
- input = new DataInputStream(connection.getInputStream());
- validateConnectionResponse(connection, url, msgToEncode, encHash);
- } catch (IOException e) {
- // ioErrs.increment(1);
- // If we got a read error at this stage, it implies there was a problem
- // with the first map, typically lost map. So, penalize only that map
- // and add the rest
- InputAttemptIdentifier firstAttempt = srcAttempts.get(0);
- fetcherCallback.fetchFailed(host, firstAttempt, false);
- remaining.remove(firstAttempt);
- return new FetchResult(host, port, partition, remaining);
- }
-
- // By this point, the connection is setup and the response has been
- // validated.
-
- // Loop through available map-outputs and fetch them
- // On any error, faildTasks is not null and we exit
- // after putting back the remaining maps to the
- // yet_to_be_fetched list and marking the failed tasks.
- InputAttemptIdentifier[] failedInputs = null;
- while (!remaining.isEmpty() && failedInputs == null) {
- failedInputs = fetchInputs(input);
- }
-
- if (failedInputs != null && failedInputs.length > 0) {
- LOG.warn("copyInputs failed for tasks " + Arrays.toString(failedInputs));
- for (InputAttemptIdentifier left : failedInputs) {
- fetcherCallback.fetchFailed(host, left, false);
- remaining.remove(left);
- }
- }
-
- IOUtils.cleanup(LOG, input);
-
- // Sanity check
- if (failedInputs == null && !remaining.isEmpty()) {
- throw new IOException("server didn't return all expected map outputs: "
- + remaining.size() + " left.");
- }
-
- return new FetchResult(host, port, partition, remaining);
-
- }
-
- private InputAttemptIdentifier[] fetchInputs(DataInputStream input) {
- FetchedInput fetchedInput = null;
- InputAttemptIdentifier srcAttemptId = null;
- long decompressedLength = -1;
- long compressedLength = -1;
-
- try {
- long startTime = System.currentTimeMillis();
- int responsePartition = -1;
- // Read the shuffle header
- try {
- ShuffleHeader header = new ShuffleHeader();
- header.readFields(input);
- String pathComponent = header.getMapId();
-
- srcAttemptId = pathToAttemptMap.get(pathComponent);
- compressedLength = header.getCompressedLength();
- decompressedLength = header.getUncompressedLength();
- responsePartition = header.getPartition();
- } catch (IllegalArgumentException e) {
- // badIdErrs.increment(1);
- LOG.warn("Invalid src id ", e);
- // Don't know which one was bad, so consider all of them as bad
- return remaining.toArray(new InputAttemptIdentifier[remaining.size()]);
- }
-
- // Do some basic sanity verification
- if (!verifySanity(compressedLength, decompressedLength,
- responsePartition, srcAttemptId)) {
- return new InputAttemptIdentifier[] { srcAttemptId };
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength
- + ", decomp len: " + decompressedLength);
- }
-
- // Get the location for the map output - either in-memory or on-disk
- fetchedInput = inputManager.allocate(decompressedLength, srcAttemptId);
-
- // TODO NEWTEZ No concept of WAIT at the moment.
- // // Check if we can shuffle *now* ...
- // if (fetchedInput.getType() == FetchedInput.WAIT) {
- // LOG.info("fetcher#" + id +
- // " - MergerManager returned Status.WAIT ...");
- // //Not an error but wait to process data.
- // return EMPTY_ATTEMPT_ID_ARRAY;
- // }
-
- // Go!
- LOG.info("fetcher" + " about to shuffle output of srcAttempt "
- + fetchedInput.getInputAttemptIdentifier() + " decomp: "
- + decompressedLength + " len: " + compressedLength + " to "
- + fetchedInput.getType());
-
- if (fetchedInput.getType() == Type.MEMORY) {
- shuffleToMemory((MemoryFetchedInput) fetchedInput, input,
- (int) decompressedLength, (int) compressedLength);
- } else {
- shuffleToDisk((DiskFetchedInput) fetchedInput, input, compressedLength);
- }
-
- // Inform the shuffle scheduler
- long endTime = System.currentTimeMillis();
- fetcherCallback.fetchSucceeded(host, srcAttemptId, fetchedInput,
- compressedLength, (endTime - startTime));
-
- // Note successful shuffle
- remaining.remove(srcAttemptId);
- // metrics.successFetch();
- return null;
- } catch (IOException ioe) {
- // ioErrs.increment(1);
- if (srcAttemptId == null || fetchedInput == null) {
- LOG.info("fetcher" + " failed to read map header" + srcAttemptId
- + " decomp: " + decompressedLength + ", " + compressedLength, ioe);
- if (srcAttemptId == null) {
- return remaining
- .toArray(new InputAttemptIdentifier[remaining.size()]);
- } else {
- return new InputAttemptIdentifier[] { srcAttemptId };
- }
- }
- LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + host,
- ioe);
-
- // Inform the shuffle-scheduler
- try {
- fetchedInput.abort();
- } catch (IOException e) {
- LOG.info("Failure to cleanup fetchedInput: " + fetchedInput);
- }
- // metrics.failedFetch();
- return new InputAttemptIdentifier[] { srcAttemptId };
- }
- }
-
- @SuppressWarnings("resource")
- private void shuffleToMemory(MemoryFetchedInput fetchedInput,
- InputStream input, int decompressedLength, int compressedLength)
- throws IOException {
- IFileInputStream checksumIn = new IFileInputStream(input, compressedLength,
- conf);
-
- input = checksumIn;
-
- // Are map-outputs compressed?
- if (codec != null) {
- decompressor.reset();
- input = codec.createInputStream(input, decompressor);
- }
- // Copy map-output into an in-memory buffer
- byte[] shuffleData = fetchedInput.getBytes();
-
- try {
- IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
- // metrics.inputBytes(shuffleData.length);
- LOG.info("Read " + shuffleData.length + " bytes from input for "
- + fetchedInput.getInputAttemptIdentifier());
- } catch (IOException ioe) {
- // Close the streams
- IOUtils.cleanup(LOG, input);
- // Re-throw
- throw ioe;
- }
- }
-
- private void shuffleToDisk(DiskFetchedInput fetchedInput, InputStream input,
- long compressedLength) throws IOException {
- // Copy data to local-disk
- OutputStream output = fetchedInput.getOutputStream();
- long bytesLeft = compressedLength;
- try {
- final int BYTES_TO_READ = 64 * 1024;
- byte[] buf = new byte[BYTES_TO_READ];
- while (bytesLeft > 0) {
- int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
- if (n < 0) {
- throw new IOException("read past end of stream reading "
- + fetchedInput.getInputAttemptIdentifier());
- }
- output.write(buf, 0, n);
- bytesLeft -= n;
- // metrics.inputBytes(n);
- }
-
- LOG.info("Read " + (compressedLength - bytesLeft)
- + " bytes from input for " + fetchedInput.getInputAttemptIdentifier());
-
- output.close();
- } catch (IOException ioe) {
- // Close the streams
- IOUtils.cleanup(LOG, input, output);
-
- // Re-throw
- throw ioe;
- }
-
- // Sanity check
- if (bytesLeft != 0) {
- throw new IOException("Incomplete input received for "
- + fetchedInput.getInputAttemptIdentifier() + " from " + host + " ("
- + bytesLeft + " bytes missing of " + compressedLength + ")");
- }
- }
-
- /**
- * Do some basic verification on the input received -- Being defensive
- *
- * @param compressedLength
- * @param decompressedLength
- * @param fetchPartition
- * @param remaining
- * @param mapId
- * @return true/false, based on if the verification succeeded or not
- */
- private boolean verifySanity(long compressedLength, long decompressedLength,
- int fetchPartition, InputAttemptIdentifier srcAttemptId) {
- if (compressedLength < 0 || decompressedLength < 0) {
- // wrongLengthErrs.increment(1);
- LOG.warn(" invalid lengths in input header: id: " + srcAttemptId
- + " len: " + compressedLength + ", decomp len: " + decompressedLength);
- return false;
- }
-
- if (fetchPartition != this.partition) {
- // wrongReduceErrs.increment(1);
- LOG.warn(" data for the wrong reduce map: " + srcAttemptId + " len: "
- + compressedLength + " decomp len: " + decompressedLength
- + " for reduce " + fetchPartition);
- return false;
- }
-
- // Sanity check
- if (!remaining.contains(srcAttemptId)) {
- // wrongMapErrs.increment(1);
- LOG.warn("Invalid input. Received output for " + srcAttemptId);
- return false;
- }
- return true;
- }
-
- private HttpURLConnection connectToShuffleHandler(String host, int port,
- int partition, List<InputAttemptIdentifier> inputs) throws IOException {
- try {
- this.url = constructInputURL(host, port, partition, inputs);
- HttpURLConnection connection = openConnection(url);
-
- // generate hash of the url
- this.msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
- this.encHash = SecureShuffleUtils.hashFromString(msgToEncode,
- shuffleSecret);
-
- // put url hash into http header
- connection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
- encHash);
- // set the read timeout
- connection.setReadTimeout(readTimeout);
- // put shuffle version into http header
- connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
- ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
- connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
- ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-
- connect(connection, connectionTimeout);
- return connection;
- } catch (IOException e) {
- LOG.warn("Failed to connect to " + host + " with " + srcAttempts.size()
- + " inputs", e);
- throw e;
- }
- }
-
- private void validateConnectionResponse(HttpURLConnection connection,
- URL url, String msgToEncode, String encHash) throws IOException {
- int rc = connection.getResponseCode();
- if (rc != HttpURLConnection.HTTP_OK) {
- throw new IOException("Got invalid response code " + rc + " from " + url
- + ": " + connection.getResponseMessage());
- }
-
- if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(connection
- .getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
- || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(connection
- .getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
- throw new IOException("Incompatible shuffle response version");
- }
-
- // get the replyHash which is HMac of the encHash we sent to the server
- String replyHash = connection
- .getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
- if (replyHash == null) {
- throw new IOException("security validation of TT Map output failed");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("url=" + msgToEncode + ";encHash=" + encHash + ";replyHash="
- + replyHash);
- }
- // verify that replyHash is HMac of encHash
- SecureShuffleUtils.verifyReply(replyHash, encHash, shuffleSecret);
- LOG.info("for url=" + msgToEncode + " sent hash and receievd reply");
- }
-
- protected HttpURLConnection openConnection(URL url) throws IOException {
- HttpURLConnection conn = (HttpURLConnection) url.openConnection();
- if (sslShuffle) {
- HttpsURLConnection httpsConn = (HttpsURLConnection) conn;
- try {
- httpsConn.setSSLSocketFactory(sslFactory.createSSLSocketFactory());
- } catch (GeneralSecurityException ex) {
- throw new IOException(ex);
- }
- httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
- }
- return conn;
- }
-
- /**
- * The connection establishment is attempted multiple times and is given up
- * only on the last failure. Instead of connecting with a timeout of X, we try
- * connecting with a timeout of x < X but multiple times.
- */
- private void connect(URLConnection connection, int connectionTimeout)
- throws IOException {
- int unit = 0;
- if (connectionTimeout < 0) {
- throw new IOException("Invalid timeout " + "[timeout = "
- + connectionTimeout + " ms]");
- } else if (connectionTimeout > 0) {
- unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
- }
- // set the connect timeout to the unit-connect-timeout
- connection.setConnectTimeout(unit);
- while (true) {
- try {
- connection.connect();
- break;
- } catch (IOException ioe) {
- // update the total remaining connect-timeout
- connectionTimeout -= unit;
-
- // throw an exception if we have waited for timeout amount of time
- // note that the updated value if timeout is used here
- if (connectionTimeout == 0) {
- throw ioe;
- }
-
- // reset the connect timeout for the last try
- if (connectionTimeout < unit) {
- unit = connectionTimeout;
- // reset the connect time out for the final connect
- connection.setConnectTimeout(unit);
- }
- }
- }
- }
-
- private URL constructInputURL(String host, int port, int partition,
- List<InputAttemptIdentifier> inputs) throws MalformedURLException {
- StringBuilder url = ShuffleUtils.constructBaseURIForShuffleHandler(host,
- port, partition, appId);
- boolean first = true;
- for (InputAttemptIdentifier input : inputs) {
- if (first) {
- first = false;
- url.append(input.getPathComponent());
- } else {
- url.append(",").append(input.getPathComponent());
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("InputFetch URL for: " + host + " : " + url.toString());
- }
- return new URL(url.toString());
- }
-
- /**
- * Builder for the construction of Fetchers
- */
- public static class FetcherBuilder {
- private Fetcher fetcher;
- private boolean workAssigned = false;
-
- public FetcherBuilder(FetcherCallback fetcherCallback,
- FetchedInputAllocator inputManager, ApplicationId appId,
- SecretKey shuffleSecret, Configuration conf) {
- this.fetcher = new Fetcher(fetcherCallback, inputManager, appId,
- shuffleSecret, conf);
- }
-
- public FetcherBuilder setCompressionParameters(CompressionCodec codec,
- Decompressor decompressor) {
- fetcher.codec = codec;
- fetcher.decompressor = decompressor;
- return this;
- }
-
- public FetcherBuilder setConnectionParameters(int connectionTimeout,
- int readTimeout) {
- fetcher.connectionTimeout = connectionTimeout;
- fetcher.readTimeout = readTimeout;
- return this;
- }
-
- public FetcherBuilder assignWork(String host, int port, int partition,
- List<InputAttemptIdentifier> inputs) {
- fetcher.host = host;
- fetcher.port = port;
- fetcher.partition = partition;
- fetcher.srcAttempts = inputs;
- workAssigned = true;
- return this;
- }
-
- public Fetcher build() {
- Preconditions.checkState(workAssigned == true,
- "Cannot build a fetcher withot assigning work to it");
- return fetcher;
- }
- }
-
- @Override
- public int hashCode() {
- return fetcherIdentifier;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- Fetcher other = (Fetcher) obj;
- if (fetcherIdentifier != other.fetcherIdentifier)
- return false;
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetcherCallback.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetcherCallback.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetcherCallback.java
deleted file mode 100644
index f0b7cd2..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/FetcherCallback.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.shuffle.common;
-
-import java.io.IOException;
-
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-public interface FetcherCallback {
-
- public void fetchSucceeded(String host, InputAttemptIdentifier srcAttemptIdentifier, FetchedInput fetchedInput, long fetchedBytes, long copyDuration) throws IOException;
-
- public void fetchFailed(String host, InputAttemptIdentifier srcAttemptIdentifier, boolean connectFailed);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/InputHost.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/InputHost.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/InputHost.java
deleted file mode 100644
index 4862b76..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/InputHost.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.shuffle.common;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-public class InputHost {
-
- private final String host;
- private final int port;
-
- private final List<InputAttemptIdentifier> inputs = new LinkedList<InputAttemptIdentifier>();
-
- public InputHost(String hostName, int port, ApplicationId appId) {
- this.host = hostName;
- this.port = port;
- }
-
- public String getHost() {
- return this.host;
- }
-
- public int getPort() {
- return this.port;
- }
-
- public synchronized int getNumPendingInputs() {
- return inputs.size();
- }
-
- public synchronized void addKnownInput(InputAttemptIdentifier srcAttempt) {
- inputs.add(srcAttempt);
- }
-
- public synchronized List<InputAttemptIdentifier> clearAndGetPendingInputs() {
- List<InputAttemptIdentifier> inputsCopy = new ArrayList<InputAttemptIdentifier>(
- inputs);
- inputs.clear();
- return inputsCopy;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((host == null) ? 0 : host.hashCode());
- result = prime * result + port;
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- InputHost other = (InputHost) obj;
- if (host == null) {
- if (other.host != null)
- return false;
- } else if (!host.equals(other.host))
- return false;
- if (port != other.port)
- return false;
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/MemoryFetchedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/MemoryFetchedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/MemoryFetchedInput.java
deleted file mode 100644
index 59d288e..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/MemoryFetchedInput.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.shuffle.common;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-import com.google.common.base.Preconditions;
-
-public class MemoryFetchedInput extends FetchedInput {
-
- private BoundedByteArrayOutputStream byteStream;
-
- public MemoryFetchedInput(long size,
- InputAttemptIdentifier inputAttemptIdentifier,
- FetchedInputCallback callbackHandler) {
- super(Type.MEMORY, size, inputAttemptIdentifier, callbackHandler);
- this.byteStream = new BoundedByteArrayOutputStream((int) size);
- }
-
- @Override
- public OutputStream getOutputStream() {
- return byteStream;
- }
-
- @Override
- public InputStream getInputStream() {
- return new ByteArrayInputStream(byteStream.getBuffer());
- }
-
- public byte[] getBytes() {
- return byteStream.getBuffer();
- }
-
- @Override
- public void commit() {
- if (state == State.PENDING) {
- state = State.COMMITTED;
- notifyFetchComplete();
- }
- }
-
- @Override
- public void abort() {
- if (state == State.PENDING) {
- state = State.ABORTED;
- notifyFetchFailure();
- }
- }
-
- @Override
- public void free() {
- Preconditions.checkState(
- state == State.COMMITTED || state == State.ABORTED,
- "FetchedInput can only be freed after it is committed or aborted");
- if (state == State.COMMITTED) {
- state = State.FREED;
- this.byteStream = null;
- notifyFreedResource();
- }
- }
-
- @Override
- public String toString() {
- return "MemoryFetchedInput [inputAttemptIdentifier="
- + inputAttemptIdentifier + ", size=" + size + ", type=" + type
- + ", id=" + id + ", state=" + state + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
deleted file mode 100644
index 84d270d..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.shuffle.common;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import javax.crypto.SecretKey;
-
-import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.apache.tez.engine.common.security.JobTokenSecretManager;
-
-public class ShuffleUtils {
-
- public static String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce.shuffle";
-
- public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
- throws IOException {
- DataInputByteBuffer in = new DataInputByteBuffer();
- in.reset(meta);
- Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
- jt.readFields(in);
- SecretKey sk = JobTokenSecretManager.createSecretKey(jt.getPassword());
- return sk;
- }
-
- public static ByteBuffer convertJobTokenToBytes(
- Token<JobTokenIdentifier> jobToken) throws IOException {
- DataOutputBuffer dob = new DataOutputBuffer();
- jobToken.write(dob);
- ByteBuffer bb = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
- return bb;
- }
-
- public static int deserializeShuffleProviderMetaData(ByteBuffer meta)
- throws IOException {
- DataInputByteBuffer in = new DataInputByteBuffer();
- try {
- in.reset(meta);
- int port = in.readInt();
- return port;
- } finally {
- in.close();
- }
- }
-
- // TODO NEWTEZ handle ssl shuffle
- public static StringBuilder constructBaseURIForShuffleHandler(String host, int port, int partition, ApplicationId appId) {
- StringBuilder sb = new StringBuilder("http://");
- sb.append(host);
- sb.append(":");
- sb.append(String.valueOf(port));
- sb.append("/");
- sb.append("mapOutput?job=");
- sb.append(appId.toString().replace("application", "job"));
- sb.append("&reduce=");
- sb.append(String.valueOf(partition));
- sb.append("&map=");
- return sb;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/proto/Events.proto
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/proto/Events.proto b/tez-engine/src/main/proto/Events.proto
deleted file mode 100644
index fa9cb2c..0000000
--- a/tez-engine/src/main/proto/Events.proto
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tez.engine.api.events";
-option java_outer_classname = "SystemEventProtos";
-option java_generate_equals_and_hash = true;
-
-message TaskAttemptFailedEventProto {
- optional string diagnostics = 1;
-}
-
-message TaskAttemptCompletedEventProto {
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/main/proto/ShufflePayloads.proto
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/proto/ShufflePayloads.proto b/tez-engine/src/main/proto/ShufflePayloads.proto
deleted file mode 100644
index f831de2..0000000
--- a/tez-engine/src/main/proto/ShufflePayloads.proto
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.tez.engine.common.shuffle.newimpl";
-option java_outer_classname = "ShuffleUserPayloads";
-option java_generate_equals_and_hash = true;
-
-message DataMovementEventPayloadProto {
- optional bool output_generated = 1;
- optional string host = 2;
- optional int32 port = 3;
- optional string path_component = 4;
- optional int32 run_duration = 5;
-}
-
-message InputInformationEventPayloadProto {
- optional int32 partition_range = 1;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-engine/src/test/java/org/apache/tez/engine/common/objectregistry/TestObjectRegistry.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/test/java/org/apache/tez/engine/common/objectregistry/TestObjectRegistry.java b/tez-engine/src/test/java/org/apache/tez/engine/common/objectregistry/TestObjectRegistry.java
deleted file mode 100644
index 7276782..0000000
--- a/tez-engine/src/test/java/org/apache/tez/engine/common/objectregistry/TestObjectRegistry.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.objectregistry;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-
-public class TestObjectRegistry {
-
- @SuppressWarnings("unused")
- @Before
- public void setup() {
- Injector injector = Guice.createInjector(new ObjectRegistryModule());
- }
-
- @Test
- public void testBasicCRUD() {
- ObjectRegistry objectRegistry =
- ObjectRegistryFactory.getObjectRegistry();
- Assert.assertNotNull(objectRegistry);
-
- Assert.assertNull(objectRegistry.get("foo"));
- Assert.assertFalse(objectRegistry.delete("foo"));
- Integer one = new Integer(1);
- Integer two_1 = new Integer(2);
- Integer two_2 = new Integer(3);
- Assert.assertNull(objectRegistry.add(ObjectLifeCycle.DAG, "one", one));
- Assert.assertEquals(one, objectRegistry.get("one"));
- Assert.assertNull(objectRegistry.add(ObjectLifeCycle.DAG, "two", two_1));
- Assert.assertNotNull(objectRegistry.add(ObjectLifeCycle.SESSION, "two", two_2));
- Assert.assertNotEquals(two_1, objectRegistry.get("two"));
- Assert.assertEquals(two_2, objectRegistry.get("two"));
- Assert.assertTrue(objectRegistry.delete("one"));
- Assert.assertFalse(objectRegistry.delete("one"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 05675b5..e98b45f 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -77,17 +77,17 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.engine.common.objectregistry.ObjectLifeCycle;
-import org.apache.tez.engine.common.objectregistry.ObjectRegistry;
-import org.apache.tez.engine.common.objectregistry.ObjectRegistryFactory;
-import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.common.objectregistry.ObjectLifeCycle;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistry;
+import org.apache.tez.runtime.common.objectregistry.ObjectRegistryFactory;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
import com.google.common.annotations.VisibleForTesting;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index ec419c1..cad79f5 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -70,14 +70,14 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
/**
* An MRR job built on top of word count to return words sorted by
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 7e662cb..7280a1f 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -68,8 +68,6 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DAGStatus.State;
-import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
import org.apache.tez.mapreduce.examples.MRRSleepJob;
import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer;
import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
@@ -82,6 +80,8 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/tez-mapreduce/pom.xml b/tez-mapreduce/pom.xml
index 2c6b78e..aa3d915 100644
--- a/tez-mapreduce/pom.xml
+++ b/tez-mapreduce/pom.xml
@@ -35,7 +35,12 @@
</dependency>
<dependency>
<groupId>org.apache.tez</groupId>
- <artifactId>tez-engine</artifactId>
+ <artifactId>tez-runtime-library</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-runtime-internals</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
index f2b0a38..22d4a75 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
@@ -853,7 +853,7 @@
// LOG.info(TezJobConfig.LOCAL_DIRS + " for child : " + taskAttemptID +
// " is " + childMapredLocalDir);
// conf.set(TezJobConfig.LOCAL_DIRS, childMapredLocalDir.toString());
-// conf.setClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+// conf.setClass(Constants.TEZ_RUNTIME_TASK_OUTPUT_MANAGER,
// TezLocalTaskOutputFiles.class, TezTaskOutput.class);
// }
//
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
index 199bbfe..3bc8da2 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/combine/MRCombiner.java
@@ -42,18 +42,18 @@ import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.api.TezTaskContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.ValuesIterator;
-import org.apache.tez.engine.common.combine.Combiner;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.TezTaskContext;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.ValuesIterator;
+import org.apache.tez.runtime.library.common.combine.Combiner;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.runtime.library.common.sort.impl.IFile.Writer;
@SuppressWarnings({"rawtypes", "unchecked"})
public class MRCombiner implements Combiner {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
index 21a3983..dac92ed 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
@@ -23,9 +23,10 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.Constants;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.common.Constants;
+
import com.google.common.collect.Maps;
public class DeprecatedKeys {
@@ -50,15 +51,15 @@ public class DeprecatedKeys {
/**
- * Keys used by the engine.
+ * Keys used by the Tez Runtime.
*/
- private static Map<String, String> mrParamToEngineParamMap =
+ private static Map<String, String> mrParamToTezRuntimeParamMap =
new HashMap<String, String>();
static {
- populateMRToEngineParamMap();
+ populateMRToTezRuntimeParamMap();
populateMRToDagParamMap();
populateMultiStageParamMap();
addDeprecatedKeys();
@@ -70,32 +71,32 @@ public class DeprecatedKeys {
multiStageParamMap.put(
MRJobConfig.KEY_COMPARATOR,
getDeprecationMap(
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS));
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS));
multiStageParamMap.put(
MRJobConfig.MAP_OUTPUT_KEY_CLASS,
getDeprecationMap(
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_CLASS,
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS));
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS));
multiStageParamMap.put(
MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
getDeprecationMap(
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_VALUE_CLASS,
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS));
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS));
multiStageParamMap.put(
MRJobConfig.MAP_OUTPUT_COMPRESS,
getDeprecationMap(
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_IS_COMPRESSED,
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS));
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED,
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS));
multiStageParamMap.put(
MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
getDeprecationMap(
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_COMPRESS_CODEC,
- TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_COMPRESS_CODEC));
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC,
+ TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC));
}
private static Map<MultiStageKeys, String> getDeprecationMap(String inputKey, String outputKey) {
@@ -130,23 +131,23 @@ public class DeprecatedKeys {
public static void init() {
}
- private static void populateMRToEngineParamMap() {
+ private static void populateMRToTezRuntimeParamMap() {
- registerMRToEngineKeyTranslation(MRConfig.MAPRED_IFILE_READAHEAD, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD);
+ registerMRToRuntimeKeyTranslation(MRConfig.MAPRED_IFILE_READAHEAD, TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD);
- registerMRToEngineKeyTranslation(MRConfig.MAPRED_IFILE_READAHEAD_BYTES, TezJobConfig.TEZ_ENGINE_IFILE_READAHEAD_BYTES);
+ registerMRToRuntimeKeyTranslation(MRConfig.MAPRED_IFILE_READAHEAD_BYTES, TezJobConfig.TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
- registerMRToEngineKeyTranslation(MRJobConfig.RECORDS_BEFORE_PROGRESS, TezJobConfig.RECORDS_BEFORE_PROGRESS);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.RECORDS_BEFORE_PROGRESS, TezJobConfig.RECORDS_BEFORE_PROGRESS);
- registerMRToEngineKeyTranslation(MRJobConfig.IO_SORT_FACTOR, TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.IO_SORT_FACTOR, TezJobConfig.TEZ_RUNTIME_IO_SORT_FACTOR);
- registerMRToEngineKeyTranslation(MRJobConfig.MAP_SORT_SPILL_PERCENT, TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.MAP_SORT_SPILL_PERCENT, TezJobConfig.TEZ_RUNTIME_SORT_SPILL_PERCENT);
- registerMRToEngineKeyTranslation(MRJobConfig.IO_SORT_MB, TezJobConfig.TEZ_ENGINE_IO_SORT_MB);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.IO_SORT_MB, TezJobConfig.TEZ_RUNTIME_IO_SORT_MB);
- registerMRToEngineKeyTranslation(MRJobConfig.INDEX_CACHE_MEMORY_LIMIT, TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.INDEX_CACHE_MEMORY_LIMIT, TezJobConfig.TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
- registerMRToEngineKeyTranslation(MRJobConfig.MAP_COMBINE_MIN_SPILLS, TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.MAP_COMBINE_MIN_SPILLS, TezJobConfig.TEZ_RUNTIME_COMBINE_MIN_SPILLS);
// Counter replacement will work in this manner, as long as TezCounters
// extends MRCounters and is used directly by the Mapper/Reducer.
@@ -154,56 +155,56 @@ public class DeprecatedKeys {
// may break.
// Framework counters, like FILESYSTEM will likely be incompatible since
// they enum key belongs to a different package.
- registerMRToEngineKeyTranslation(MRJobConfig.COUNTERS_MAX_KEY, TezJobConfig.COUNTERS_MAX_KEY);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.COUNTERS_MAX_KEY, TezJobConfig.COUNTERS_MAX_KEY);
- registerMRToEngineKeyTranslation(MRJobConfig.COUNTER_GROUP_NAME_MAX_KEY, TezJobConfig.COUNTER_GROUP_NAME_MAX_KEY);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.COUNTER_GROUP_NAME_MAX_KEY, TezJobConfig.COUNTER_GROUP_NAME_MAX_KEY);
- registerMRToEngineKeyTranslation(MRJobConfig.COUNTER_NAME_MAX_KEY, TezJobConfig.COUNTER_NAME_MAX_KEY);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.COUNTER_NAME_MAX_KEY, TezJobConfig.COUNTER_NAME_MAX_KEY);
- registerMRToEngineKeyTranslation(MRJobConfig.COUNTER_GROUPS_MAX_KEY, TezJobConfig.COUNTER_GROUPS_MAX_KEY);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.COUNTER_GROUPS_MAX_KEY, TezJobConfig.COUNTER_GROUPS_MAX_KEY);
- registerMRToEngineKeyTranslation(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, Constants.TEZ_ENGINE_TASK_MEMORY);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.REDUCE_MEMORY_TOTAL_BYTES, Constants.TEZ_RUNTIME_TASK_MEMORY);
- registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_PARALLEL_COPIES, TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_PARALLEL_COPIES, TezJobConfig.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
- registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_FETCH_FAILURES, TezJobConfig.TEZ_ENGINE_SHUFFLE_FETCH_FAILURES);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_FETCH_FAILURES, TezJobConfig.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES);
- registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_NOTIFY_READERROR, TezJobConfig.TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_NOTIFY_READERROR, TezJobConfig.TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR);
- registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_CONNECT_TIMEOUT);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT, TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT);
- registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_READ_TIMEOUT, TezJobConfig.TEZ_ENGINE_SHUFFLE_READ_TIMEOUT);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_READ_TIMEOUT, TezJobConfig.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
- registerMRToEngineKeyTranslation(MRConfig.SHUFFLE_SSL_ENABLED_KEY, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_SSL);
+ registerMRToRuntimeKeyTranslation(MRConfig.SHUFFLE_SSL_ENABLED_KEY, TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
- registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_INPUT_BUFFER_PERCENT);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_RUNTIME_SHUFFLE_INPUT_BUFFER_PERCENT);
- registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMORY_LIMIT_PERCENT);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_MEMORY_LIMIT_PERCENT, TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
- registerMRToEngineKeyTranslation(MRJobConfig.SHUFFLE_MERGE_PERCENT, TezJobConfig.TEZ_ENGINE_SHUFFLE_MERGE_PERCENT);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.SHUFFLE_MERGE_PERCENT, TezJobConfig.TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
- registerMRToEngineKeyTranslation(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, TezJobConfig.TEZ_ENGINE_SHUFFLE_MEMTOMEM_SEGMENTS);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.REDUCE_MEMTOMEM_THRESHOLD, TezJobConfig.TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS);
- registerMRToEngineKeyTranslation(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, TezJobConfig.TEZ_ENGINE_SHUFFLE_ENABLE_MEMTOMEM);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.REDUCE_MEMTOMEM_ENABLED, TezJobConfig.TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
- registerMRToEngineKeyTranslation(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_ENGINE_INPUT_BUFFER_PERCENT);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.REDUCE_INPUT_BUFFER_PERCENT, TezJobConfig.TEZ_RUNTIME_INPUT_BUFFER_PERCENT);
- registerMRToEngineKeyTranslation(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, TezJobConfig.DAG_CREDENTIALS_BINARY);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY, TezJobConfig.DAG_CREDENTIALS_BINARY);
- registerMRToEngineKeyTranslation("map.sort.class", TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS);
+ registerMRToRuntimeKeyTranslation("map.sort.class", TezJobConfig.TEZ_RUNTIME_INTERNAL_SORTER_CLASS);
- registerMRToEngineKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_ENGINE_GROUP_COMPARATOR_CLASS);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
- registerMRToEngineKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_ENGINE_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS);
+ registerMRToRuntimeKeyTranslation(MRJobConfig.GROUP_COMPARATOR_CLASS, TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS);
}
private static void addDeprecatedKeys() {
}
- private static void registerMRToEngineKeyTranslation(String mrKey,
+ private static void registerMRToRuntimeKeyTranslation(String mrKey,
String tezKey) {
- mrParamToEngineParamMap.put(mrKey, tezKey);
+ mrParamToTezRuntimeParamMap.put(mrKey, tezKey);
}
@SuppressWarnings("unused")
@@ -215,8 +216,8 @@ public class DeprecatedKeys {
return Collections.unmodifiableMap(mrParamToDAGParamMap);
}
- public static Map<String, String> getMRToEngineParamMap() {
- return Collections.unmodifiableMap(mrParamToEngineParamMap);
+ public static Map<String, String> getMRToTezRuntimeParamMap() {
+ return Collections.unmodifiableMap(mrParamToTezRuntimeParamMap);
}
// TODO Ideally, multi-stage should not be exposed.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index b0ed6ab..c39ca4a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -367,16 +367,16 @@ public class MRHelpers {
// the AM anyway.
// TODO eventually ACLs
- conf.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS, MRPartitioner.class.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, MRPartitioner.class.getName());
boolean useNewApi = conf.getBoolean("mapred.mapper.new-api", false);
if (useNewApi) {
if (conf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null) {
- conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
}
} else {
if (conf.get("mapred.combiner.class") != null) {
- conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
index d888c42..b07b04b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
@@ -221,7 +221,7 @@ public class MultiStageMRConfToTezTranslator {
int numStages = numIntermediateStages + (hasFinalReduceStage ? 2 : 1);
// Setup Tez partitioner class
- conf.set(TezJobConfig.TEZ_ENGINE_PARTITIONER_CLASS,
+ conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
MRPartitioner.class.getName());
// Setup Tez Combiner class if required.
@@ -229,11 +229,11 @@ public class MultiStageMRConfToTezTranslator {
boolean useNewApi = conf.getBoolean("mapred.mapper.new-api", false);
if (useNewApi) {
if (conf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null) {
- conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
}
} else {
if (conf.get("mapred.combiner.class") != null) {
- conf.set(TezJobConfig.TEZ_ENGINE_COMBINER_CLASS, MRCombiner.class.getName());
+ conf.set(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
}
}
@@ -259,7 +259,7 @@ public class MultiStageMRConfToTezTranslator {
}
private static void processDirectConversion(Configuration conf) {
- for (Entry<String, String> dep : DeprecatedKeys.getMRToEngineParamMap()
+ for (Entry<String, String> dep : DeprecatedKeys.getMRToTezRuntimeParamMap()
.entrySet()) {
if (conf.get(dep.getKey()) != null) {
// TODO Deprecation reason does not seem to reflect in the config ?
@@ -336,7 +336,7 @@ public class MultiStageMRConfToTezTranslator {
Configuration baseConf, String stage) {
JobConf jobConf = new JobConf(baseConf);
// Don't clobber explicit tez config.
- if (conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_KEY_CLASS) == null) {
+ if (conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS) == null) {
// If this is set, but the comparator is not set, and their types differ -
// the job will break.
if (conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS) == null) {
@@ -352,7 +352,7 @@ public class MultiStageMRConfToTezTranslator {
}
}
- if (conf.get(TezJobConfig.TEZ_ENGINE_INTERMEDIATE_OUTPUT_VALUE_CLASS) == null) {
+ if (conf.get(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS) == null) {
if (conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS) == null) {
conf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, jobConf
.getMapOutputValueClass().getName());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
index 0bcd45e..2378f58 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
@@ -57,7 +57,7 @@ public class MultiStageMRConfigUtil {
public static Configuration getAndRemoveBasicNonIntermediateStageConf(
Configuration baseConf) {
Configuration newConf = new Configuration(false);
- for (String key : DeprecatedKeys.getMRToEngineParamMap().keySet()) {
+ for (String key : DeprecatedKeys.getMRToTezRuntimeParamMap().keySet()) {
if (baseConf.get(key) != null) {
newConf.set(key, baseConf.get(key));
baseConf.unset(key);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
index 2a926d7..635af90 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapred/MRReporter.java
@@ -3,9 +3,9 @@ package org.apache.tez.mapreduce.hadoop.mapred;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.tez.engine.api.TezProcessorContext;
-import org.apache.tez.engine.api.TezTaskContext;
import org.apache.tez.mapreduce.common.Utils;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.TezTaskContext;
public class MRReporter implements Reporter {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
index dcdb3ff..2d27c4b 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/MapContextImpl.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.tez.engine.api.TezTaskContext;
+import org.apache.tez.runtime.api.TezTaskContext;
/**
* The context that is given to the {@link Mapper}.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
index 4035c71..be65be7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskAttemptContextImpl.java
@@ -24,8 +24,8 @@ import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.tez.engine.api.TezTaskContext;
import org.apache.tez.mapreduce.common.Utils;
+import org.apache.tez.runtime.api.TezTaskContext;
// NOTE: NEWTEZ: This is a copy of org.apache.tez.mapreduce.hadoop.mapred (not mapreduce). mapred likely does not need it's own copy of this class.
// Meant for use by the "mapreduce" API
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
index 05ea89c..5b5c8ec 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/mapreduce/TaskInputOutputContextImpl.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.tez.engine.api.TezTaskContext;
+import org.apache.tez.runtime.api.TezTaskContext;
/**
* A context object that allows input and output from the task. It is only
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 6066d93..b9f2242 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -46,13 +46,14 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVReader;
-import org.apache.tez.engine.api.LogicalInput;
-import org.apache.tez.engine.api.TezInputContext;
import org.apache.tez.mapreduce.common.Utils;
import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Input;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.library.api.KVReader;
import com.google.common.base.Preconditions;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index e6bdbe6..11184e4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -26,16 +26,16 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVWriter;
-import org.apache.tez.engine.api.LogicalOutput;
-import org.apache.tez.engine.api.TezOutputContext;
import org.apache.tez.mapreduce.common.Utils;
import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.library.api.KVWriter;
public class MROutput implements LogicalOutput {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
index d061ad5..224900e 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
@@ -24,11 +24,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.common.ConfigUtils;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.runtime.library.common.ConfigUtils;
@SuppressWarnings({ "rawtypes", "unchecked" })
-public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
+public class MRPartitioner implements org.apache.tez.runtime.library.api.Partitioner {
static final Log LOG = LogFactory.getLog(MRPartitioner.class);
@@ -40,7 +40,7 @@ public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
public MRPartitioner(Configuration conf) {
this.useNewApi = ConfigUtils.useNewApi(conf);
- this.partitions = conf.getInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, 1);
+ this.partitions = conf.getInt(TezJobConfig.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, 1);
if (useNewApi) {
if (partitions > 1) {
@@ -85,4 +85,4 @@ public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
return oldPartitioner.getPartition(key, value, numPartitions);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
index fac1454..5471c55 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTask.java
@@ -62,7 +62,6 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
-import org.apache.tez.common.Constants;
import org.apache.tez.common.TezJobConfig;
import org.apache.tez.common.TezTaskStatus.State;
import org.apache.tez.common.TezUtils;
@@ -70,11 +69,6 @@ import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.engine.api.LogicalOutput;
-import org.apache.tez.engine.api.TezProcessorContext;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.apache.tez.engine.common.security.TokenCache;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
import org.apache.tez.mapreduce.hadoop.IDConverter;
import org.apache.tez.mapreduce.hadoop.MRConfig;
@@ -82,6 +76,12 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.security.JobTokenIdentifier;
+import org.apache.tez.runtime.library.common.security.TokenCache;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
@SuppressWarnings("deprecation")
public abstract class MRTask {
@@ -155,7 +155,7 @@ public abstract class MRTask {
} else {
this.jobConf = new JobConf(conf);
}
- jobConf.set(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID,
+ jobConf.set(Constants.TEZ_RUNTIME_TASK_ATTEMPT_ID,
taskAttemptId.toString());
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
context.getDAGAttemptNumber());
@@ -687,7 +687,4 @@ public abstract class MRTask {
return taskAttemptId;
}
- public TezProcessorContext getTezEngineTaskContext() {
- return processorContext;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
index 85139ed..74a34af 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
@@ -24,12 +24,12 @@ import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.Reporter;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.TezInputContext;
-import org.apache.tez.engine.api.TezOutputContext;
-import org.apache.tez.engine.api.TezProcessorContext;
-import org.apache.tez.engine.api.TezTaskContext;
import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
+import org.apache.tez.runtime.api.TezInputContext;
+import org.apache.tez.runtime.api.TezOutputContext;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.TezTaskContext;
@InterfaceAudience.Private
@InterfaceStability.Unstable
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
index e4b990a..b7ecddd 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
@@ -35,20 +35,20 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezException;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVReader;
-import org.apache.tez.engine.api.KVWriter;
-import org.apache.tez.engine.api.LogicalIOProcessor;
-import org.apache.tez.engine.api.LogicalInput;
-import org.apache.tez.engine.api.LogicalOutput;
-import org.apache.tez.engine.api.TezProcessorContext;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
import org.apache.tez.mapreduce.hadoop.mapreduce.MapContextImpl;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
@SuppressWarnings({ "unchecked", "rawtypes" })
public class MapProcessor extends MRTask implements LogicalIOProcessor {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 19acb39..1ba76f6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -37,20 +37,20 @@ import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Event;
-import org.apache.tez.engine.api.KVReader;
-import org.apache.tez.engine.api.KVWriter;
-import org.apache.tez.engine.api.LogicalIOProcessor;
-import org.apache.tez.engine.api.LogicalInput;
-import org.apache.tez.engine.api.LogicalOutput;
-import org.apache.tez.engine.api.TezProcessorContext;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.lib.input.ShuffledMergedInputLegacy;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.MRTask;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.LogicalIOProcessor;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.library.api.KVReader;
+import org.apache.tez.runtime.library.api.KVWriter;
+import org.apache.tez.runtime.library.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
+import org.apache.tez.runtime.library.output.OnFileSortedOutput;
@SuppressWarnings({ "unchecked", "rawtypes" })
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java
index 9de2ed1..08b66eb 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilical.java
@@ -24,9 +24,9 @@ import java.util.Collection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.impl.EventMetaData;
-import org.apache.tez.engine.api.impl.TezEvent;
-import org.apache.tez.engine.api.impl.TezUmbilical;
+import org.apache.tez.runtime.api.impl.EventMetaData;
+import org.apache.tez.runtime.api.impl.TezEvent;
+import org.apache.tez.runtime.api.impl.TezUmbilical;
public class TestUmbilical implements TezUmbilical {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b212ca1d/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
index b75f01e..5e3d201 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestConfigTranslationMRToTez.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.runtime.library.common.ConfigUtils;
import org.junit.Test;
public class TestConfigTranslationMRToTez {