You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/08/14 00:19:48 UTC
[2/2] git commit: TEZ-1347. Consolidate MRHelpers. (sseth)
TEZ-1347. Consolidate MRHelpers. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/0d5fdf3d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/0d5fdf3d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/0d5fdf3d
Branch: refs/heads/master
Commit: 0d5fdf3dec8c22ba642eaa10b760fa8aced1c74e
Parents: 0f2abb1
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Aug 13 15:19:29 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Aug 13 15:19:29 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/tez/client/TezClientUtils.java | 1 +
.../org/apache/tez/common/TezCommonUtils.java | 1 +
.../java/org/apache/tez/common/TezUtils.java | 151 ++++++++
.../api/events/InputInitializerEvent.java | 3 +
.../java/org/apache/tez/common/TezUtils.java | 320 -----------------
.../org/apache/tez/common/TezUtilsInternal.java | 224 ++++++++++++
.../org/apache/tez/common/TestTezUtils.java | 34 +-
.../org/apache/tez/dag/app/DAGAppMaster.java | 6 +-
.../impl/SimpleHistoryLoggingService.java | 4 +-
.../org/apache/tez/runtime/task/TezChild.java | 8 +-
.../mapreduce/examples/FilterLinesByWord.java | 8 +-
.../examples/FilterLinesByWordOneToOne.java | 8 +-
.../tez/mapreduce/examples/MRRSleepJob.java | 19 +-
.../examples/TestOrderedWordCount.java | 19 +-
.../processor/FilterByWordInputProcessor.java | 1 -
.../apache/tez/mapreduce/client/YARNRunner.java | 17 +-
.../mapreduce/committer/MROutputCommitter.java | 4 +-
.../common/MRInputAMSplitGenerator.java | 5 +-
.../common/MRInputSplitDistributor.java | 4 +-
.../apache/tez/mapreduce/hadoop/MRHelpers.java | 356 +++++++------------
.../org/apache/tez/mapreduce/input/MRInput.java | 12 +-
.../tez/mapreduce/input/base/MRInputBase.java | 6 +-
.../apache/tez/mapreduce/output/MROutput.java | 4 +-
.../apache/tez/mapreduce/processor/MRTask.java | 2 +-
.../common/TestMRInputSplitDistributor.java | 6 +-
.../hadoop/TestConfigTranslationMRToTez.java | 4 +-
.../mapreduce/hadoop/TestDeprecatedKeys.java | 4 +-
.../tez/mapreduce/hadoop/TestMRHelpers.java | 29 +-
.../tez/mapreduce/processor/MapUtils.java | 6 +-
.../processor/map/TestMapProcessor.java | 4 +-
.../processor/reduce/TestReduceProcessor.java | 7 +-
.../runtime/api/impl/TezCountersDelegate.java | 6 +-
.../library/common/shuffle/impl/Fetcher.java | 5 +-
.../common/shuffle/impl/MergeManager.java | 11 +-
.../library/common/shuffle/impl/Shuffle.java | 4 +-
.../shuffle/impl/ShuffleInputEventHandler.java | 4 +-
.../common/shuffle/impl/ShuffleScheduler.java | 4 +-
.../common/sort/impl/PipelinedSorter.java | 5 +-
.../common/sort/impl/dflt/DefaultSorter.java | 4 +-
.../writers/UnorderedPartitionedKVWriter.java | 8 +-
.../runtime/library/input/LocalMergedInput.java | 1 -
.../library/input/ShuffledMergedInput.java | 2 +-
.../library/input/ShuffledUnorderedKVInput.java | 3 +-
.../library/output/OnFileSortedOutput.java | 6 +-
.../library/output/OnFileUnorderedKVOutput.java | 5 +-
.../OnFileUnorderedPartitionedKVOutput.java | 2 +-
.../impl/ShuffleInputEventHandlerImpl.java | 4 +-
.../shuffle/common/impl/ShuffleManager.java | 4 +-
.../impl/TestShuffleInputEventHandler.java | 4 +-
.../TestUnorderedPartitionedKVWriter.java | 6 +-
.../library/output/TestOnFileSortedOutput.java | 2 +-
.../impl/TestShuffleInputEventHandlerImpl.java | 5 +-
.../apache/tez/mapreduce/TestMRRJobsDAGApi.java | 22 +-
.../tez/test/dag/SixLevelsFailingDAG.java | 1 -
.../tez/test/dag/ThreeLevelsFailingDAG.java | 2 -
56 files changed, 674 insertions(+), 724 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 05dd880..ec78b85 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -54,6 +54,7 @@ INCOMPATIBLE CHANGES
TEZ-1237. Consolidate naming of API classes
TEZ-1407. Move MRInput related methods out of MRHelpers and consolidate.
TEZ-1194. Make TezUserPayload user facing for payload specification
+ TEZ-1347. Consolidate MRHelpers.
Release 0.4.0-incubating: 2014-04-05
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 52e5027..8827108 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -100,6 +100,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+@Private
public class TezClientUtils {
private static Log LOG = LogFactory.getLog(TezClientUtils.class);
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index c15324c..edb8e8e 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -38,6 +38,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
import com.google.protobuf.ByteString;
+@Private
public class TezCommonUtils {
public static final FsPermission TEZ_AM_DIR_PERMISSION = FsPermission
.createImmutable((short) 0700); // rwx--------
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
new file mode 100644
index 0000000..f24dc11
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.Deflater;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.records.DAGProtos;
+
+
+/**
+ * Utility methods for setting up a DAG. Has helpers for setting up log4j configuration, converting
+ * {@link org.apache.hadoop.conf.Configuration} to {@link org.apache.tez.dag.api.UserPayload} etc.
+ */
+@InterfaceAudience.Public
+public class TezUtils {
+
+
+ /**
+ * Allows changing the log level for task / AM logging. </p>
+ *
+ * Adds the JVM system properties necessary to configure
+ * {@link org.apache.hadoop.yarn.ContainerLogAppender}.
+ *
+ * @param logLevel the desired log level (eg INFO/WARN/DEBUG)
+ * @param vargs the argument list to append to
+ */
+ public static void addLog4jSystemProperties(String logLevel,
+ List<String> vargs) {
+ TezClientUtils.addLog4jSystemProperties(logLevel, vargs);
+ }
+
+
+ /**
+ * Convert a Configuration to compressed ByteString using Protocol buffer
+ *
+ * @param conf
+ * : Configuration to be converted
+ * @return PB ByteString (compressed)
+ * @throws java.io.IOException
+ */
+ public static ByteString createByteStringFromConf(Configuration conf) throws IOException {
+ Preconditions.checkNotNull(conf, "Configuration must be specified");
+ ByteString.Output os = ByteString.newOutput();
+ DeflaterOutputStream compressOs = new DeflaterOutputStream(os,
+ new Deflater(Deflater.BEST_SPEED));
+ try {
+ writeConfInPB(compressOs, conf);
+ } finally {
+ if (compressOs != null) {
+ compressOs.close();
+ }
+ }
+ return os.toByteString();
+ }
+
+ /**
+ * Convert a Configuration to a {@link org.apache.tez.dag.api.UserPayload} </p>
+ *
+ *
+ * @param conf configuration to be converted
+ * @return an instance of {@link org.apache.tez.dag.api.UserPayload}
+ * @throws java.io.IOException
+ */
+ public static UserPayload createUserPayloadFromConf(Configuration conf) throws IOException {
+ return new UserPayload(createByteStringFromConf(conf).toByteArray());
+ }
+
+ /**
+ * Convert a byte string to a Configuration object
+ *
+ * @param byteString byteString representation of the conf created using {@link
+ * #createByteStringFromConf(org.apache.hadoop.conf.Configuration)}
+ * @return Configuration
+ * @throws java.io.IOException
+ */
+ public static Configuration createConfFromByteString(ByteString byteString) throws IOException {
+ Preconditions.checkNotNull(byteString, "ByteString must be specified");
+ // SnappyInputStream uncompressIs = new
+ // SnappyInputStream(byteString.newInput());
+ InflaterInputStream uncompressIs = new InflaterInputStream(byteString.newInput());
+ DAGProtos.ConfigurationProto confProto = DAGProtos.ConfigurationProto.parseFrom(uncompressIs);
+ Configuration conf = new Configuration(false);
+ readConfFromPB(confProto, conf);
+ return conf;
+ }
+
+ /**
+ * Convert an instance of {@link org.apache.tez.dag.api.UserPayload} to {@link
+ * org.apache.hadoop.conf.Configuration}
+ *
+ * @param payload {@link org.apache.tez.dag.api.UserPayload} created using {@link
+ * #createUserPayloadFromConf(org.apache.hadoop.conf.Configuration)}
+ * @return Configuration
+ * @throws java.io.IOException
+ */
+ public static Configuration createConfFromUserPayload(UserPayload payload) throws IOException {
+ return createConfFromByteString(ByteString.copyFrom(payload.getPayload()));
+ }
+
+
+ private static void writeConfInPB(OutputStream dos, Configuration conf) throws IOException {
+ DAGProtos.ConfigurationProto.Builder confProtoBuilder = DAGProtos.ConfigurationProto
+ .newBuilder();
+ Iterator<Map.Entry<String, String>> iter = conf.iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, String> entry = iter.next();
+ DAGProtos.PlanKeyValuePair.Builder kvp = DAGProtos.PlanKeyValuePair.newBuilder();
+ kvp.setKey(entry.getKey());
+ kvp.setValue(entry.getValue());
+ confProtoBuilder.addConfKeyValues(kvp);
+ }
+ DAGProtos.ConfigurationProto confProto = confProtoBuilder.build();
+ confProto.writeTo(dos);
+ }
+
+ private static void readConfFromPB(DAGProtos.ConfigurationProto confProto, Configuration conf) {
+ List<DAGProtos.PlanKeyValuePair> settingList = confProto.getConfKeyValuesList();
+ for (DAGProtos.PlanKeyValuePair setting : settingList) {
+ conf.set(setting.getKey(), setting.getValue());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
index 87198c1..3d1ed11 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/events/InputInitializerEvent.java
@@ -20,6 +20,7 @@
package org.apache.tez.runtime.api.events;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.Event;
@@ -46,6 +47,8 @@ public class InputInitializerEvent extends Event {
*/
public InputInitializerEvent(String targetVertexName, String targetInputName,
byte[] eventPayload, int version) {
+ Preconditions.checkNotNull(targetVertexName, "TargetVertexName cannot be null");
+ Preconditions.checkNotNull(targetInputName, "TargetInputName cannot be null");
this.targetVertexName = targetVertexName;
this.targetInputName = targetInputName;
this.version = version;
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
deleted file mode 100644
index 3b3c7a7..0000000
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.tez.common;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.zip.DataFormatException;
-import java.util.zip.Deflater;
-import java.util.zip.DeflaterOutputStream;
-import java.util.zip.Inflater;
-import java.util.zip.InflaterInputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.log4j.Appender;
-import org.apache.log4j.Logger;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
-import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
-import com.google.protobuf.ByteString;
-
-public class TezUtils {
-
- private static final Log LOG = LogFactory.getLog(TezUtils.class);
- private static final Random RANDOM = new Random();
-
- public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
- IOException {
- FileInputStream confPBBinaryStream = null;
- ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
- try {
- confPBBinaryStream =
- new FileInputStream(new File(baseDir, TezConfiguration.TEZ_PB_BINARY_CONF_NAME));
- confProtoBuilder.mergeFrom(confPBBinaryStream);
- } finally {
- if (confPBBinaryStream != null) {
- confPBBinaryStream.close();
- }
- }
-
- ConfigurationProto confProto = confProtoBuilder.build();
-
- List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
- if (kvPairList != null && !kvPairList.isEmpty()) {
- for (PlanKeyValuePair kvPair : kvPairList) {
- conf.set(kvPair.getKey(), kvPair.getValue());
- }
- }
- }
-
- /**
- * Convert a Configuration to compressed ByteString using Protocol buffer
- *
- * @param conf
- * : Configuration to be converted
- * @return PB ByteString (compressed)
- * @throws IOException
- */
- public static ByteString createByteStringFromConf(Configuration conf) throws IOException {
- Preconditions.checkNotNull(conf, "Configuration must be specified");
- ByteString.Output os = ByteString.newOutput();
- DeflaterOutputStream compressOs = new DeflaterOutputStream(os,
- new Deflater(Deflater.BEST_SPEED));
- try {
- writeConfInPB(compressOs, conf);
- } finally {
- if (compressOs != null) {
- compressOs.close();
- }
- }
- return os.toByteString();
- }
-
- /**
- * Convert a Configuration to compressed user pay load (i.e. byte[]) using
- * Protocol buffer
- *
- * @param conf
- * : Configuration to be converted
- * @return compressed pay load
- * @throws IOException
- */
- public static UserPayload createUserPayloadFromConf(Configuration conf) throws IOException {
- return new UserPayload(createByteStringFromConf(conf).toByteArray());
- }
-
- /**
- * Convert compressed byte string to a Configuration object using protocol
- * buffer
- *
- * @param byteString
- * :compressed conf in Protocol buffer
- * @return Configuration
- * @throws IOException
- */
- public static Configuration createConfFromByteString(ByteString byteString) throws IOException {
- Preconditions.checkNotNull(byteString, "ByteString must be specified");
- // SnappyInputStream uncompressIs = new
- // SnappyInputStream(byteString.newInput());
- InflaterInputStream uncompressIs = new InflaterInputStream(byteString.newInput());
- ConfigurationProto confProto = ConfigurationProto.parseFrom(uncompressIs);
- Configuration conf = new Configuration(false);
- readConfFromPB(confProto, conf);
- return conf;
- }
-
- /**
- * Convert compressed pay load in byte[] to a Configuration object using
- * protocol buffer
- *
- * @param payload
- * : compressed pay load
- * @return Configuration
- * @throws IOException
- */
- public static Configuration createConfFromUserPayload(UserPayload payload) throws IOException {
- return createConfFromByteString(ByteString.copyFrom(payload.getPayload()));
- }
-
- private static void writeConfInPB(OutputStream dos, Configuration conf) throws IOException {
- ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
- Iterator<Entry<String, String>> iter = conf.iterator();
- while (iter.hasNext()) {
- Entry<String, String> entry = iter.next();
- PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
- kvp.setKey(entry.getKey());
- kvp.setValue(entry.getValue());
- confProtoBuilder.addConfKeyValues(kvp);
- }
- ConfigurationProto confProto = confProtoBuilder.build();
- confProto.writeTo(dos);
- }
-
- private static void readConfFromPB(ConfigurationProto confProto, Configuration conf) {
- List<PlanKeyValuePair> settingList = confProto.getConfKeyValuesList();
- for (PlanKeyValuePair setting : settingList) {
- conf.set(setting.getKey(), setting.getValue());
- }
- }
-
- public static byte[] compressBytes(byte[] inBytes) throws IOException {
- Stopwatch sw = null;
- if (LOG.isDebugEnabled()) {
- sw = new Stopwatch().start();
- }
- byte[] compressed = compressBytesInflateDeflate(inBytes);
- if (LOG.isDebugEnabled()) {
- sw.stop();
- LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: " + compressed.length
- + ", CompressTime: " + sw.elapsedMillis());
- }
- return compressed;
- }
-
- public static byte[] uncompressBytes(byte[] inBytes) throws IOException {
- Stopwatch sw = null;
- if (LOG.isDebugEnabled()) {
- sw = new Stopwatch().start();
- }
- byte[] uncompressed = uncompressBytesInflateDeflate(inBytes);
- if (LOG.isDebugEnabled()) {
- sw.stop();
- LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: " + uncompressed.length
- + ", UncompressTimeTaken: " + sw.elapsedMillis());
- }
- return uncompressed;
- }
-
- private static byte[] compressBytesInflateDeflate(byte[] inBytes) {
- Deflater deflater = new Deflater(Deflater.BEST_SPEED);
- deflater.setInput(inBytes);
- ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
- deflater.finish();
- byte[] buffer = new byte[1024 * 8];
- while (!deflater.finished()) {
- int count = deflater.deflate(buffer);
- bos.write(buffer, 0, count);
- }
- byte[] output = bos.toByteArray();
- return output;
- }
-
- private static byte[] uncompressBytesInflateDeflate(byte[] inBytes) throws IOException {
- Inflater inflater = new Inflater();
- inflater.setInput(inBytes);
- ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
- byte[] buffer = new byte[1024 * 8];
- while (!inflater.finished()) {
- int count;
- try {
- count = inflater.inflate(buffer);
- } catch (DataFormatException e) {
- throw new IOException(e);
- }
- bos.write(buffer, 0, count);
- }
- byte[] output = bos.toByteArray();
- return output;
- }
-
- private static final Pattern pattern = Pattern.compile("\\W");
- @Private
- public static final int MAX_VERTEX_NAME_LENGTH = 40;
-
- @Private
- public static String cleanVertexName(String vertexName) {
- return sanitizeString(vertexName).substring(0,
- vertexName.length() > MAX_VERTEX_NAME_LENGTH ? MAX_VERTEX_NAME_LENGTH : vertexName.length());
- }
-
- private static String sanitizeString(String srcString) {
- Matcher matcher = pattern.matcher(srcString);
- String res = matcher.replaceAll("_");
- return res; // Number starts allowed rightnow
- }
-
- public static void updateLoggers(String addend) throws FileNotFoundException {
-
- LOG.info("Redirecting log file based on addend: " + addend);
-
- Appender appender = Logger.getRootLogger().getAppender(
- TezConfiguration.TEZ_CONTAINER_LOGGER_NAME);
- if (appender != null) {
- if (appender instanceof TezContainerLogAppender) {
- TezContainerLogAppender claAppender = (TezContainerLogAppender) appender;
- claAppender.setLogFileName(constructLogFileName(
- TezConfiguration.TEZ_CONTAINER_LOG_FILE_NAME, addend));
- claAppender.activateOptions();
- } else {
- LOG.warn("Appender is a " + appender.getClass() + "; require an instance of "
- + TezContainerLogAppender.class.getName() + " to reconfigure the logger output");
- }
- } else {
- LOG.warn("Not configured with appender named: " + TezConfiguration.TEZ_CONTAINER_LOGGER_NAME
- + ". Cannot reconfigure logger output");
- }
- }
-
- private static String constructLogFileName(String base, String addend) {
- if (addend == null || addend.isEmpty()) {
- return base;
- } else {
- return base + "_" + addend;
- }
- }
-
- public static BitSet fromByteArray(byte[] bytes) {
- if (bytes == null) {
- return new BitSet();
- }
- BitSet bits = new BitSet();
- for (int i = 0; i < bytes.length * 8; i++) {
- if ((bytes[(bytes.length) - (i / 8) - 1] & (1 << (i % 8))) > 0) {
- bits.set(i);
- }
- }
- return bits;
- }
-
- public static byte[] toByteArray(BitSet bits) {
- if (bits == null) {
- return null;
- }
- byte[] bytes = new byte[bits.length() / 8 + 1];
- for (int i = 0; i < bits.length(); i++) {
- if (bits.get(i)) {
- bytes[(bytes.length) - (i / 8) - 1] |= 1 << (i % 8);
- }
- }
- return bytes;
- }
-
- public static String getContainerLogDir() {
- String logDirsStr = System.getenv(Environment.LOG_DIRS.name());
- if (logDirsStr == null || logDirsStr.isEmpty()) {
- return null;
- }
- String[] logDirs = StringUtils.split(logDirsStr, ',');
- if (logDirs.length == 0) {
- return null;
- }
- int logIndex = RANDOM.nextInt(logDirs.length);
- return logDirs[logIndex];
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
new file mode 100644
index 0000000..05feae7
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.common;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.zip.DataFormatException;
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.log4j.Appender;
+import org.apache.log4j.Logger;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
+import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
+
+import com.google.common.base.Stopwatch;
+
+@Private
+public class TezUtilsInternal {
+
+ private static final Log LOG = LogFactory.getLog(TezUtilsInternal.class);
+ private static final Random RANDOM = new Random();
+
+ public static void addUserSpecifiedTezConfiguration(String baseDir, Configuration conf) throws
+ IOException {
+ FileInputStream confPBBinaryStream = null;
+ ConfigurationProto.Builder confProtoBuilder = ConfigurationProto.newBuilder();
+ try {
+ confPBBinaryStream =
+ new FileInputStream(new File(baseDir, TezConfiguration.TEZ_PB_BINARY_CONF_NAME));
+ confProtoBuilder.mergeFrom(confPBBinaryStream);
+ } finally {
+ if (confPBBinaryStream != null) {
+ confPBBinaryStream.close();
+ }
+ }
+
+ ConfigurationProto confProto = confProtoBuilder.build();
+
+ List<PlanKeyValuePair> kvPairList = confProto.getConfKeyValuesList();
+ if (kvPairList != null && !kvPairList.isEmpty()) {
+ for (PlanKeyValuePair kvPair : kvPairList) {
+ conf.set(kvPair.getKey(), kvPair.getValue());
+ }
+ }
+ }
+
+
+ public static byte[] compressBytes(byte[] inBytes) throws IOException {
+ Stopwatch sw = null;
+ if (LOG.isDebugEnabled()) {
+ sw = new Stopwatch().start();
+ }
+ byte[] compressed = compressBytesInflateDeflate(inBytes);
+ if (LOG.isDebugEnabled()) {
+ sw.stop();
+ LOG.debug("UncompressedSize: " + inBytes.length + ", CompressedSize: " + compressed.length
+ + ", CompressTime: " + sw.elapsedMillis());
+ }
+ return compressed;
+ }
+
+ public static byte[] uncompressBytes(byte[] inBytes) throws IOException {
+ Stopwatch sw = null;
+ if (LOG.isDebugEnabled()) {
+ sw = new Stopwatch().start();
+ }
+ byte[] uncompressed = uncompressBytesInflateDeflate(inBytes);
+ if (LOG.isDebugEnabled()) {
+ sw.stop();
+ LOG.debug("CompressedSize: " + inBytes.length + ", UncompressedSize: " + uncompressed.length
+ + ", UncompressTimeTaken: " + sw.elapsedMillis());
+ }
+ return uncompressed;
+ }
+
+ private static byte[] compressBytesInflateDeflate(byte[] inBytes) {
+ Deflater deflater = new Deflater(Deflater.BEST_SPEED);
+ deflater.setInput(inBytes);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
+ deflater.finish();
+ byte[] buffer = new byte[1024 * 8];
+ while (!deflater.finished()) {
+ int count = deflater.deflate(buffer);
+ bos.write(buffer, 0, count);
+ }
+ byte[] output = bos.toByteArray();
+ return output;
+ }
+
+ private static byte[] uncompressBytesInflateDeflate(byte[] inBytes) throws IOException {
+ Inflater inflater = new Inflater();
+ inflater.setInput(inBytes);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(inBytes.length);
+ byte[] buffer = new byte[1024 * 8];
+ while (!inflater.finished()) {
+ int count;
+ try {
+ count = inflater.inflate(buffer);
+ } catch (DataFormatException e) {
+ throw new IOException(e);
+ }
+ bos.write(buffer, 0, count);
+ }
+ byte[] output = bos.toByteArray();
+ return output;
+ }
+
+ private static final Pattern pattern = Pattern.compile("\\W");
+ @Private
+ public static final int MAX_VERTEX_NAME_LENGTH = 40;
+
+ @Private
+ public static String cleanVertexName(String vertexName) {
+ return sanitizeString(vertexName).substring(0,
+ vertexName.length() > MAX_VERTEX_NAME_LENGTH ? MAX_VERTEX_NAME_LENGTH : vertexName.length());
+ }
+
+ private static String sanitizeString(String srcString) {
+ Matcher matcher = pattern.matcher(srcString);
+ String res = matcher.replaceAll("_");
+ return res; // Number starts allowed rightnow
+ }
+
+ public static void updateLoggers(String addend) throws FileNotFoundException {
+
+ LOG.info("Redirecting log file based on addend: " + addend);
+
+ Appender appender = Logger.getRootLogger().getAppender(
+ TezConfiguration.TEZ_CONTAINER_LOGGER_NAME);
+ if (appender != null) {
+ if (appender instanceof TezContainerLogAppender) {
+ TezContainerLogAppender claAppender = (TezContainerLogAppender) appender;
+ claAppender.setLogFileName(constructLogFileName(
+ TezConfiguration.TEZ_CONTAINER_LOG_FILE_NAME, addend));
+ claAppender.activateOptions();
+ } else {
+ LOG.warn("Appender is a " + appender.getClass() + "; require an instance of "
+ + TezContainerLogAppender.class.getName() + " to reconfigure the logger output");
+ }
+ } else {
+ LOG.warn("Not configured with appender named: " + TezConfiguration.TEZ_CONTAINER_LOGGER_NAME
+ + ". Cannot reconfigure logger output");
+ }
+ }
+
+ private static String constructLogFileName(String base, String addend) {
+ if (addend == null || addend.isEmpty()) {
+ return base;
+ } else {
+ return base + "_" + addend;
+ }
+ }
+
+ public static BitSet fromByteArray(byte[] bytes) {
+ if (bytes == null) {
+ return new BitSet();
+ }
+ BitSet bits = new BitSet();
+ for (int i = 0; i < bytes.length * 8; i++) {
+ if ((bytes[(bytes.length) - (i / 8) - 1] & (1 << (i % 8))) > 0) {
+ bits.set(i);
+ }
+ }
+ return bits;
+ }
+
+ public static byte[] toByteArray(BitSet bits) {
+ if (bits == null) {
+ return null;
+ }
+ byte[] bytes = new byte[bits.length() / 8 + 1];
+ for (int i = 0; i < bits.length(); i++) {
+ if (bits.get(i)) {
+ bytes[(bytes.length) - (i / 8) - 1] |= 1 << (i % 8);
+ }
+ }
+ return bytes;
+ }
+
+ public static String getContainerLogDir() {
+ String logDirsStr = System.getenv(Environment.LOG_DIRS.name());
+ if (logDirsStr == null || logDirsStr.isEmpty()) {
+ return null;
+ }
+ String[] logDirs = StringUtils.split(logDirsStr, ',');
+ if (logDirs.length == 0) {
+ return null;
+ }
+ int logIndex = RANDOM.nextInt(logDirs.length);
+ return logDirs[logIndex];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
index 852d6dc..e1036a5 100644
--- a/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
+++ b/tez-common/src/test/java/org/apache/tez/common/TestTezUtils.java
@@ -56,10 +56,10 @@ public class TestTezUtils {
@Test
public void testCleanVertexName() {
String testString = "special characters & spaces and longer than "
- + TezUtils.MAX_VERTEX_NAME_LENGTH + " characters";
- Assert.assertTrue(testString.length() > TezUtils.MAX_VERTEX_NAME_LENGTH);
- String cleaned = TezUtils.cleanVertexName(testString);
- Assert.assertTrue(cleaned.length() <= TezUtils.MAX_VERTEX_NAME_LENGTH);
+ + TezUtilsInternal.MAX_VERTEX_NAME_LENGTH + " characters";
+ Assert.assertTrue(testString.length() > TezUtilsInternal.MAX_VERTEX_NAME_LENGTH);
+ String cleaned = TezUtilsInternal.cleanVertexName(testString);
+ Assert.assertTrue(cleaned.length() <= TezUtilsInternal.MAX_VERTEX_NAME_LENGTH);
Assert.assertFalse(cleaned.contains("\\s+"));
Assert.assertTrue(cleaned.matches("\\w+"));
}
@@ -67,39 +67,39 @@ public class TestTezUtils {
@Test
public void testBitSetToByteArray() {
BitSet bitSet = createBitSet(0);
- byte[] bytes = TezUtils.toByteArray(bitSet);
+ byte[] bytes = TezUtilsInternal.toByteArray(bitSet);
Assert.assertTrue(bytes.length == ((bitSet.length() / 8) + 1));
bitSet = createBitSet(1000);
- bytes = TezUtils.toByteArray(bitSet);
+ bytes = TezUtilsInternal.toByteArray(bitSet);
Assert.assertTrue(bytes.length == ((bitSet.length() / 8) + 1));
}
@Test
public void testBitSetFromByteArray() {
BitSet bitSet = createBitSet(0);
- byte[] bytes = TezUtils.toByteArray(bitSet);
- Assert.assertEquals(TezUtils.fromByteArray(bytes).cardinality(), bitSet.cardinality());
- Assert.assertTrue(TezUtils.fromByteArray(bytes).equals(bitSet));
+ byte[] bytes = TezUtilsInternal.toByteArray(bitSet);
+ Assert.assertEquals(TezUtilsInternal.fromByteArray(bytes).cardinality(), bitSet.cardinality());
+ Assert.assertTrue(TezUtilsInternal.fromByteArray(bytes).equals(bitSet));
bitSet = createBitSet(1);
- bytes = TezUtils.toByteArray(bitSet);
- Assert.assertEquals(TezUtils.fromByteArray(bytes).cardinality(), bitSet.cardinality());
- Assert.assertTrue(TezUtils.fromByteArray(bytes).equals(bitSet));
+ bytes = TezUtilsInternal.toByteArray(bitSet);
+ Assert.assertEquals(TezUtilsInternal.fromByteArray(bytes).cardinality(), bitSet.cardinality());
+ Assert.assertTrue(TezUtilsInternal.fromByteArray(bytes).equals(bitSet));
bitSet = createBitSet(1000);
- bytes = TezUtils.toByteArray(bitSet);
- Assert.assertEquals(TezUtils.fromByteArray(bytes).cardinality(), bitSet.cardinality());
- Assert.assertTrue(TezUtils.fromByteArray(bytes).equals(bitSet));
+ bytes = TezUtilsInternal.toByteArray(bitSet);
+ Assert.assertEquals(TezUtilsInternal.fromByteArray(bytes).cardinality(), bitSet.cardinality());
+ Assert.assertTrue(TezUtilsInternal.fromByteArray(bytes).equals(bitSet));
}
@Test
public void testBitSetConversion() {
for (int i = 0 ; i < 16 ; i++) {
BitSet bitSet = createBitSetWithSingleEntry(i);
- byte[] bytes = TezUtils.toByteArray(bitSet);
+ byte[] bytes = TezUtilsInternal.toByteArray(bitSet);
- BitSet deseraialized = TezUtils.fromByteArray(bytes);
+ BitSet deseraialized = TezUtilsInternal.fromByteArray(bytes);
Assert.assertEquals(bitSet, deseraialized);
Assert.assertEquals(bitSet.cardinality(), deseraialized.cardinality());
Assert.assertEquals(1, deseraialized.cardinality());
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 3757c62..e5143fb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -90,7 +90,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezConverterUtils;
-import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.impl.LogUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
@@ -546,7 +546,7 @@ public class DAGAppMaster extends AbstractService {
private void _updateLoggers(DAG dag, String appender) {
try {
- TezUtils.updateLoggers(dag.getID().toString() + appender);
+ TezUtilsInternal.updateLoggers(dag.getID().toString() + appender);
} catch (FileNotFoundException e) {
LOG.warn("Unable to update the logger. Continue with the old logger", e );
}
@@ -1866,7 +1866,7 @@ public class DAGAppMaster extends AbstractService {
InterruptedException {
final Configuration conf = new Configuration(new YarnConfiguration());
- TezUtils.addUserSpecifiedTezConfiguration(appMaster.workingDirectory, conf);
+ TezUtilsInternal.addUserSpecifiedTezConfiguration(appMaster.workingDirectory, conf);
// Do not automatically close FileSystem objects so that in case of
// SIGTERM I have a chance to write out the job history. I'll be closing
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
index 8ef1920..1cbf516 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.logging.HistoryLoggingService;
@@ -62,7 +62,7 @@ public class SimpleHistoryLoggingService extends HistoryLoggingService {
String logDirPath = conf.get(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR);
final String logFileName = LOG_FILE_NAME_PREFIX + "." + appContext.getApplicationAttemptId();
if (logDirPath == null || logDirPath.isEmpty()) {
- String logDir = TezUtils.getContainerLogDir();
+ String logDir = TezUtilsInternal.getContainerLogDir();
LOG.info("Log file location for SimpleHistoryLoggingService not specified, defaulting to"
+ " containerLogDir=" + logDir);
Path p;
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
index d005dbd..1ff393b 100644
--- a/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
+++ b/tez-dag/src/main/java/org/apache/tez/runtime/task/TezChild.java
@@ -52,7 +52,7 @@ import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.TezLocalResource;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
-import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
@@ -175,7 +175,7 @@ public class TezChild {
while (!executor.isTerminated()) {
if (taskCount > 0) {
- TezUtils.updateLoggers("");
+ TezUtilsInternal.updateLoggers("");
}
ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
ContainerTask containerTask = null;
@@ -201,7 +201,7 @@ public class TezChild {
} else {
String loggerAddend = containerTask.getTaskSpec().getTaskAttemptID().toString();
taskCount++;
- TezUtils.updateLoggers(loggerAddend);
+ TezUtilsInternal.updateLoggers(loggerAddend);
FileSystem.clearStatistics();
childUGI = handleNewTaskCredentials(containerTask, childUGI);
@@ -389,7 +389,7 @@ public class TezChild {
// Pull in configuration specified for the session.
// TODO TEZ-1233. This needs to be moved over the wire rather than localizing the file
// for each and every task, and reading it back from disk. Also needs to be per vertex.
- TezUtils.addUserSpecifiedTezConfiguration(workingDirectory, conf);
+ TezUtilsInternal.addUserSpecifiedTezConfiguration(workingDirectory, conf);
UserGroupInformation.setConfiguration(conf);
Limits.setConfiguration(conf);
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
index 3a081bb..4a894a3 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWord.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
@@ -69,7 +70,6 @@ import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.mapreduce.committer.MROutputCommitter;
import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutput;
@@ -167,7 +167,7 @@ public class FilterLinesByWord extends Configured implements Tool {
stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
stage2Conf.setBoolean("mapred.mapper.new-api", false);
- UserPayload stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+ UserPayload stage1Payload = TezUtils.createUserPayloadFromConf(stage1Conf);
// Setup stage1 Vertex
Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))
@@ -188,12 +188,12 @@ public class FilterLinesByWord extends Configured implements Tool {
// Setup stage2 Vertex
Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
FilterByWordOutputProcessor.class.getName()).setUserPayload(
- MRHelpers.createUserPayloadFromConf(stage2Conf)), 1);
+ TezUtils.createUserPayloadFromConf(stage2Conf)), 1);
stage2Vertex.setTaskLocalFiles(commonLocalResources);
// Configure the Output for stage2
OutputDescriptor od = new OutputDescriptor(MROutput.class.getName())
- .setUserPayload(MRHelpers.createUserPayloadFromConf(stage2Conf));
+ .setUserPayload(TezUtils.createUserPayloadFromConf(stage2Conf));
OutputCommitterDescriptor ocd = new OutputCommitterDescriptor(MROutputCommitter.class.getName());
stage2Vertex.addDataSink("MROutput", new DataSinkDescriptor(od, ocd, null));
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
index a77713f..63a762d 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/FilterLinesByWordOneToOne.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
@@ -62,7 +63,6 @@ import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.mapreduce.committer.MROutputCommitter;
import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutput;
@@ -158,7 +158,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
stage2Conf.set(FileOutputFormat.OUTDIR, outputPath);
stage2Conf.setBoolean("mapred.mapper.new-api", false);
- UserPayload stage1Payload = MRHelpers.createUserPayloadFromConf(stage1Conf);
+ UserPayload stage1Payload = TezUtils.createUserPayloadFromConf(stage1Conf);
// Setup stage1 Vertex
Vertex stage1Vertex = new Vertex("stage1", new ProcessorDescriptor(
FilterByWordInputProcessor.class.getName()).setUserPayload(stage1Payload))
@@ -178,7 +178,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
// Setup stage2 Vertex
Vertex stage2Vertex = new Vertex("stage2", new ProcessorDescriptor(
- FilterByWordOutputProcessor.class.getName()).setUserPayload(MRHelpers
+ FilterByWordOutputProcessor.class.getName()).setUserPayload(TezUtils
.createUserPayloadFromConf(stage2Conf)), dsd.getNumberOfShards());
stage2Vertex.setTaskLocalFiles(commonLocalResources);
@@ -186,7 +186,7 @@ public class FilterLinesByWordOneToOne extends Configured implements Tool {
stage2Vertex.addDataSink(
"MROutput",
new DataSinkDescriptor(new OutputDescriptor(MROutput.class.getName())
- .setUserPayload(MRHelpers.createUserPayloadFromConf(stage2Conf)),
+ .setUserPayload(TezUtils.createUserPayloadFromConf(stage2Conf)),
new OutputCommitterDescriptor(MROutputCommitter.class.getName()), null));
UnorderedUnpartitionedKVEdgeConfigurer edgeConf = UnorderedUnpartitionedKVEdgeConfigurer
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index 0562633..b22a4a8 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.client.TezClient;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
@@ -428,7 +429,7 @@ public class MRRSleepJob extends Configured implements Tool {
NullOutputFormat.class.getName());
}
- MRHelpers.translateVertexConfToTez(mapStageConf);
+ MRHelpers.translateMRConfToTez(mapStageConf);
Configuration[] intermediateReduceStageConfs = null;
if (iReduceStagesCount > 0
@@ -449,7 +450,7 @@ public class MRRSleepJob extends Configured implements Tool {
MRRSleepJobPartitioner.class.getName());
- MRHelpers.translateVertexConfToTez(iReduceStageConf);
+ MRHelpers.translateMRConfToTez(iReduceStageConf);
intermediateReduceStageConfs[i-1] = iReduceStageConf;
}
}
@@ -468,18 +469,18 @@ public class MRRSleepJob extends Configured implements Tool {
finalReduceConf.set(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
NullOutputFormat.class.getName());
- MRHelpers.translateVertexConfToTez(finalReduceConf);
+ MRHelpers.translateMRConfToTez(finalReduceConf);
}
- MRHelpers.doJobClientMagic(mapStageConf);
+ MRHelpers.configureMRApiUsage(mapStageConf);
if (iReduceStagesCount > 0
&& numIReducer > 0) {
for (int i = 0; i < iReduceStagesCount; ++i) {
- MRHelpers.doJobClientMagic(intermediateReduceStageConfs[i]);
+ MRHelpers.configureMRApiUsage(intermediateReduceStageConfs[i]);
}
}
if (numReducer > 0) {
- MRHelpers.doJobClientMagic(finalReduceConf);
+ MRHelpers.configureMRApiUsage(finalReduceConf);
}
DataSourceDescriptor dataSource = null;
@@ -520,7 +521,7 @@ public class MRRSleepJob extends Configured implements Tool {
List<Vertex> vertices = new ArrayList<Vertex>();
- UserPayload mapUserPayload = MRHelpers.createUserPayloadFromConf(mapStageConf);
+ UserPayload mapUserPayload = TezUtils.createUserPayloadFromConf(mapStageConf);
int numTasks = generateSplitsInAM ? -1 : numMapper;
Vertex mapVertex = new Vertex("map", new ProcessorDescriptor(
@@ -534,7 +535,7 @@ public class MRRSleepJob extends Configured implements Tool {
for (int i = 0; i < iReduceStagesCount; ++i) {
Configuration iconf =
intermediateReduceStageConfs[i];
- UserPayload iReduceUserPayload = MRHelpers.createUserPayloadFromConf(iconf);
+ UserPayload iReduceUserPayload = TezUtils.createUserPayloadFromConf(iconf);
Vertex ivertex = new Vertex("ireduce" + (i+1),
new ProcessorDescriptor(ReduceProcessor.class.getName()).
setUserPayload(iReduceUserPayload), numIReducer);
@@ -545,7 +546,7 @@ public class MRRSleepJob extends Configured implements Tool {
Vertex finalReduceVertex = null;
if (numReducer > 0) {
- UserPayload reducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
+ UserPayload reducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
finalReduceVertex = new Vertex("reduce", new ProcessorDescriptor(
ReduceProcessor.class.getName()).setUserPayload(reducePayload), numReducer);
finalReduceVertex.setTaskLocalFiles(commonLocalResources);
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
index 255dcbd..72c14fc 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
@@ -155,7 +155,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
mapStageConf.set(MRJobConfig.MAP_CLASS_ATTR,
TokenizerMapper.class.getName());
- MRHelpers.translateVertexConfToTez(mapStageConf);
+ MRHelpers.translateMRConfToTez(mapStageConf);
Configuration iReduceStageConf = new JobConf(conf);
// TODO replace with auto-reduce parallelism
@@ -166,7 +166,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
IntWritable.class.getName());
iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
- MRHelpers.translateVertexConfToTez(iReduceStageConf);
+ MRHelpers.translateMRConfToTez(iReduceStageConf);
Configuration finalReduceConf = new JobConf(conf);
finalReduceConf.setInt(MRJobConfig.NUM_REDUCES, 1);
@@ -174,11 +174,11 @@ public class TestOrderedWordCount extends Configured implements Tool {
MyOrderByNoOpReducer.class.getName());
finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
- MRHelpers.translateVertexConfToTez(finalReduceConf);
+ MRHelpers.translateMRConfToTez(finalReduceConf);
- MRHelpers.doJobClientMagic(mapStageConf);
- MRHelpers.doJobClientMagic(iReduceStageConf);
- MRHelpers.doJobClientMagic(finalReduceConf);
+ MRHelpers.configureMRApiUsage(mapStageConf);
+ MRHelpers.configureMRApiUsage(iReduceStageConf);
+ MRHelpers.configureMRApiUsage(finalReduceConf);
List<Vertex> vertices = new ArrayList<Vertex>();
@@ -197,7 +197,8 @@ public class TestOrderedWordCount extends Configured implements Tool {
}
Vertex mapVertex = new Vertex("initialmap", new ProcessorDescriptor(
- MapProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(mapStageConf))
+ MapProcessor.class.getName()).setUserPayload(
+ TezUtils.createUserPayloadFromConf(mapStageConf))
.setHistoryText(mapStageHistoryText)).setTaskLocalFiles(commonLocalResources);
mapVertex.addDataSource("MRInput", dsd);
vertices.add(mapVertex);
@@ -207,7 +208,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
String iReduceStageHistoryText = new String(iROutputStream.toByteArray(), "UTF-8");
Vertex ivertex = new Vertex("intermediate_reducer", new ProcessorDescriptor(
ReduceProcessor.class.getName())
- .setUserPayload(MRHelpers.createUserPayloadFromConf(iReduceStageConf))
+ .setUserPayload(TezUtils.createUserPayloadFromConf(iReduceStageConf))
.setHistoryText(iReduceStageHistoryText), 2);
ivertex.setTaskLocalFiles(commonLocalResources);
vertices.add(ivertex);
@@ -215,7 +216,7 @@ public class TestOrderedWordCount extends Configured implements Tool {
ByteArrayOutputStream finalReduceOutputStream = new ByteArrayOutputStream(4096);
finalReduceConf.writeXml(finalReduceOutputStream);
String finalReduceStageHistoryText = new String(finalReduceOutputStream.toByteArray(), "UTF-8");
- UserPayload finalReducePayload = MRHelpers.createUserPayloadFromConf(finalReduceConf);
+ UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf(finalReduceConf);
Vertex finalReduceVertex = new Vertex("finalreduce",
new ProcessorDescriptor(
ReduceProcessor.class.getName())
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
index f2b7043..aa7b836 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/processor/FilterByWordInputProcessor.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.examples.FilterLinesByWord;
import org.apache.tez.mapreduce.examples.FilterLinesByWord.TextLongPair;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
index 2a0a1e0..107d63c 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/client/YARNRunner.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.client.MRTezClient;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
@@ -379,7 +380,7 @@ public class YARNRunner implements ClientProtocol {
MRJobConfig.MAPRED_ADMIN_USER_ENV);
}
- MRHelpers.updateEnvironmentForMRTasks(jobConf, environment, isMap);
+ MRHelpers.updateEnvBasedOnMRTaskEnv(jobConf, environment, isMap);
}
private Vertex createVertexForStage(Configuration stageConf,
@@ -408,12 +409,12 @@ public class YARNRunner implements ClientProtocol {
}
}
- Resource taskResource = isMap ? MRHelpers.getMapResource(stageConf)
- : MRHelpers.getReduceResource(stageConf);
+ Resource taskResource = isMap ? MRHelpers.getResourceForMRMapper(stageConf)
+ : MRHelpers.getResourceForMRReducer(stageConf);
stageConf.set(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX, "part");
- UserPayload vertexUserPayload = MRHelpers.createUserPayloadFromConf(stageConf);
+ UserPayload vertexUserPayload = TezUtils.createUserPayloadFromConf(stageConf);
Vertex vertex = new Vertex(vertexName, new ProcessorDescriptor(processorName).setUserPayload(vertexUserPayload),
numTasks, taskResource);
if (isMap) {
@@ -437,8 +438,8 @@ public class YARNRunner implements ClientProtocol {
// here
taskLocalResources.putAll(jobLocalResources);
- String taskJavaOpts = isMap ? MRHelpers.getMapJavaOpts(stageConf)
- : MRHelpers.getReduceJavaOpts(stageConf);
+ String taskJavaOpts = isMap ? MRHelpers.getJavaOptsForMRMapper(stageConf)
+ : MRHelpers.getJavaOptsForMRReducer(stageConf);
vertex.setTaskEnvironment(taskEnv)
.setTaskLocalFiles(taskLocalResources)
@@ -561,7 +562,7 @@ public class YARNRunner implements ClientProtocol {
// Transform all confs to use Tez keys
for (int i = 0; i < stageConfs.length; i++) {
- MRHelpers.translateVertexConfToTez(stageConfs[i]);
+ MRHelpers.translateMRConfToTez(stageConfs[i]);
}
// create inputs to tezClient.submit()
@@ -600,7 +601,7 @@ public class YARNRunner implements ClientProtocol {
Map<String, String> environment = new HashMap<String, String>();
// Setup the environment variables for AM
- MRHelpers.updateEnvironmentForMRAM(conf, environment);
+ MRHelpers.updateEnvBasedOnMRAMEnv(conf, environment);
StringBuilder envStrBuilder = new StringBuilder();
boolean first = true;
for (Entry<String, String> entry : environment.entrySet()) {
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index 021c82d..ba60074 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -33,11 +33,11 @@ import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.mapreduce.hadoop.MRConfig;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
@@ -65,7 +65,7 @@ public class MROutputCommitter extends OutputCommitter {
jobConf = new JobConf();
} else {
jobConf = new JobConf(
- MRHelpers.createConfFromUserPayload(userPayload));
+ TezUtils.createConfFromUserPayload(userPayload));
}
// Read all credentials into the credentials instance stored in JobConf.
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index c03d4bb..e418306 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -28,10 +28,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
@@ -72,7 +71,7 @@ public class MRInputAMSplitGenerator extends InputInitializer {
if (LOG.isDebugEnabled()) {
sw.reset().start();
}
- Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
+ Configuration conf = TezUtils.createConfFromByteString(userPayloadProto
.getConfigurationBytes());
sendSerializedEvents = conf.getBoolean(
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
index 7f0ec4e..4555c43 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputSplitDistributor.java
@@ -25,7 +25,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.common.TezUtils;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.mapreduce.lib.MRInputUtils;
@@ -67,7 +67,7 @@ public class MRInputSplitDistributor extends InputInitializer {
LOG.debug("Time to parse MRInput payload into prot: "
+ sw.elapsedMillis());
}
- Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
+ Configuration conf = TezUtils.createConfFromByteString(userPayloadProto
.getConfigurationBytes());
JobConf jobConf = new JobConf(conf);
boolean useNewApi = jobConf.getUseNewMapper();
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index c100177..0ba6e38 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -20,57 +20,96 @@ package org.apache.tez.mapreduce.hadoop;
import java.io.File;
import java.io.IOException;
-import java.net.InetAddress;
-import java.util.List;
import java.util.Map;
import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.yarn.ContainerLogAppender;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezYARNUtils;
-import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.mapreduce.combine.MRCombiner;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-import com.google.protobuf.ByteString;
-
+/**
+ * This class contains helper methods for frameworks which migrate from MapReduce to Tez, and need
+ * to continue to work with existing MapReduce configurations.
+ */
public class MRHelpers {
private static final Log LOG = LogFactory.getLog(MRHelpers.class);
-
-
/**
- * Translates MR keys to Tez for the provided conf. The conversion is
- * done in place.
+ * Translate MapReduce configuration keys to the equivalent Tez keys in the provided
+ * configuration. The translation is done in place. </p>
+ * This method is meant to be used by frameworks which rely upon existing MapReduce configuration
+ * instead of setting up their own.
*
- * @param conf
- * mr based configuration to be translated to tez
+ * @param conf mr based configuration to be translated to tez
*/
- @LimitedPrivate("Hive, Pig")
- @Unstable
- public static void translateVertexConfToTez(Configuration conf) {
+ public static void translateMRConfToTez(Configuration conf) {
convertVertexConfToTez(conf);
}
+ /**
+ * Update the provided configuration to use the new API (mapreduce) or the old API (mapred) based
+ * on the configured InputFormat, OutputFormat, Partitioner etc. Also ensures that keys not
+ * required by a particular mode are not present. </p>
+ *
+ * This method should be invoked after completely setting up the configuration. </p>
+ *
+ * Defaults to using the new API if relevant keys are not present.
+ *
+ */
+ public static void configureMRApiUsage(Configuration conf) {
+ String oldMapperClass = "mapred.mapper.class";
+ conf.setBooleanIfUnset("mapred.mapper.new-api", conf.get(oldMapperClass) == null);
+ try {
+ if (conf.getBoolean("mapred.mapper.new-api", false)) {
+ String mode = "new map API";
+ ensureNotSet(conf, "mapred.input.format.class", mode);
+ ensureNotSet(conf, oldMapperClass, mode);
+ } else {
+ String mode = "map compatability";
+ ensureNotSet(conf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode);
+ ensureNotSet(conf, MRJobConfig.MAP_CLASS_ATTR, mode);
+ }
+ } catch (IOException e) {
+ throw new TezUncheckedException(e);
+ }
+ }
+
private static void convertVertexConfToTez(Configuration vertexConf) {
setStageKeysFromBaseConf(vertexConf, vertexConf, "unknown");
processDirectConversion(vertexConf);
+ setupMRComponents(vertexConf);
+ }
+
+ private static void setupMRComponents(Configuration conf) {
+ if (conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS) == null) {
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
+ MRPartitioner.class.getName());
+ }
+
+ if (conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS) == null) {
+ boolean useNewApi = conf.getBoolean("mapred.mapper.new-api", false);
+ if (useNewApi) {
+ if (conf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null) {
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
+ }
+ } else {
+ if (conf.get("mapred.combiner.class") != null) {
+ conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
+ }
+ }
+ }
}
/**
@@ -153,10 +192,17 @@ public class MRHelpers {
}
}
+ private static void ensureNotSet(Configuration conf, String attr, String msg)
+ throws IOException {
+ if (conf.get(attr) != null) {
+ throw new IOException(attr + " is incompatible with " + msg + " mode.");
+ }
+ }
+
private static String getLog4jCmdLineProperties(Configuration conf,
boolean isMap) {
Vector<String> logProps = new Vector<String>(4);
- addLog4jSystemProperties(getChildLogLevel(conf, isMap), logProps);
+ TezUtils.addLog4jSystemProperties(getChildLogLevel(conf, isMap), logProps);
StringBuilder sb = new StringBuilder();
for (String str : logProps) {
sb.append(str).append(" ");
@@ -165,21 +211,34 @@ public class MRHelpers {
}
/**
- * Add the JVM system properties necessary to configure
- * {@link ContainerLogAppender}.
+ * Generate JVM options based on MapReduce AM java options. </p>
+ * <p/>
+ * This is only meant to be used if frameworks are not setting up their own java options or
+ * relying on the defaults specified by Tez, and instead want to use the options which may already
+ * have been configured for an MR AppMaster.
*
- * @param logLevel
- * the desired log level (eg INFO/WARN/DEBUG)
- * @param vargs
- * the argument list to append to
+ * @param conf Configuration to be used to extract JVM opts specific info
+ * @return JAVA_OPTS string to be used in launching the JVM
*/
- public static void addLog4jSystemProperties(String logLevel,
- List<String> vargs) {
- TezClientUtils.addLog4jSystemProperties(logLevel, vargs);
+ public static String getJavaOptsForMRAM(Configuration conf) {
+ // Admin opts
+ String mrAppMasterAdminOptions = conf.get(
+ MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
+ MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
+ // Add AM user command opts
+ String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
+ MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
+
+ return mrAppMasterAdminOptions.trim()
+ + " " + mrAppMasterUserOptions.trim();
}
/**
- * Generate JVM options to be used to launch map tasks
+ * Generate JVM options based on MapReduce mapper java options. </p>
+ *
+ * This is only meant to be used if frameworks are not setting up their own java options,
+ * and would like to fallback to using java options which may already be configured for
+ * Hadoop MapReduce mappers.
*
* Uses mapreduce.admin.map.child.java.opts, mapreduce.map.java.opts and
* mapreduce.map.log.level from config to generate the opts.
@@ -188,7 +247,7 @@ public class MRHelpers {
* @return JAVA_OPTS string to be used in launching the JVM
*/
@SuppressWarnings("deprecation")
- public static String getMapJavaOpts(Configuration conf) {
+ public static String getJavaOptsForMRMapper(Configuration conf) {
String adminOpts = conf.get(
MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
@@ -204,7 +263,11 @@ public class MRHelpers {
}
/**
- * Generate JVM options to be used to launch reduce tasks
+ * Generate JVM options based on MapReduce reducer java options. </p>
+ *
+ * This is only meant to be used if frameworks are not setting up their own java options,
+ * and would like to fallback to using java options which may already be configured for
+ * Hadoop MapReduce reducers.
*
* Uses mapreduce.admin.reduce.child.java.opts, mapreduce.reduce.java.opts
* and mapreduce.reduce.log.level from config to generate the opts.
@@ -213,7 +276,7 @@ public class MRHelpers {
* @return JAVA_OPTS string to be used in launching the JVM
*/
@SuppressWarnings("deprecation")
- public static String getReduceJavaOpts(Configuration conf) {
+ public static String getJavaOptsForMRReducer(Configuration conf) {
String adminOpts = conf.get(
MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS);
@@ -229,146 +292,12 @@ public class MRHelpers {
}
/**
- * Sets up parameters which used to be set by the MR JobClient. Includes
- * setting whether to use the new api or the old api. Note: Must be called
- * before generating InputSplits
- *
- * @param conf
- * configuration for the vertex.
- */
- public static void doJobClientMagic(Configuration conf) throws IOException {
- setUseNewAPI(conf);
- // TODO Maybe add functionality to check output specifications - e.g. fail
- // early if the output directory exists.
- InetAddress ip = InetAddress.getLocalHost();
- if (ip != null) {
- String submitHostAddress = ip.getHostAddress();
- String submitHostName = ip.getHostName();
- conf.set(MRJobConfig.JOB_SUBMITHOST, submitHostName);
- conf.set(MRJobConfig.JOB_SUBMITHOSTADDR, submitHostAddress);
- }
- // conf.set("hadoop.http.filter.initializers",
- // "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
- // Skipping setting JOB_DIR - not used by AM.
-
- // Maybe generate SHUFFLE secret. The AM uses the job token generated in
- // the AM anyway.
-
- // TODO eventually ACLs
- conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, MRPartitioner.class.getName());
-
- boolean useNewApi = conf.getBoolean("mapred.mapper.new-api", false);
- if (useNewApi) {
- if (conf.get(MRJobConfig.COMBINE_CLASS_ATTR) != null) {
- conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
- }
- } else {
- if (conf.get("mapred.combiner.class") != null) {
- conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
- }
- }
-
- setWorkingDirectory(conf);
- }
-
- private static void setWorkingDirectory(Configuration conf) {
- String name = conf.get(JobContext.WORKING_DIR);
- if (name == null) {
- try {
- Path dir = FileSystem.get(conf).getWorkingDirectory();
- conf.set(JobContext.WORKING_DIR, dir.toString());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- /**
- * Default to the new APIs unless they are explicitly set or the old mapper or
- * reduce attributes are used.
- *
- * @throws IOException
- * if the configuration is inconsistant
- */
- private static void setUseNewAPI(Configuration conf) throws IOException {
- int numReduces = conf.getInt(MRJobConfig.NUM_REDUCES, 1);
- String oldMapperClass = "mapred.mapper.class";
- String oldReduceClass = "mapred.reducer.class";
- conf.setBooleanIfUnset("mapred.mapper.new-api",
- conf.get(oldMapperClass) == null);
- if (conf.getBoolean("mapred.mapper.new-api", false)) {
- String mode = "new map API";
- ensureNotSet(conf, "mapred.input.format.class", mode);
- ensureNotSet(conf, oldMapperClass, mode);
- if (numReduces != 0) {
- ensureNotSet(conf, "mapred.partitioner.class", mode);
- } else {
- ensureNotSet(conf, "mapred.output.format.class", mode);
- }
- } else {
- String mode = "map compatability";
- ensureNotSet(conf, MRJobConfig.INPUT_FORMAT_CLASS_ATTR, mode);
- ensureNotSet(conf, MRJobConfig.MAP_CLASS_ATTR, mode);
- if (numReduces != 0) {
- ensureNotSet(conf, MRJobConfig.PARTITIONER_CLASS_ATTR, mode);
- } else {
- ensureNotSet(conf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
- }
- }
- if (numReduces != 0) {
- conf.setBooleanIfUnset("mapred.reducer.new-api",
- conf.get(oldReduceClass) == null);
- if (conf.getBoolean("mapred.reducer.new-api", false)) {
- String mode = "new reduce API";
- ensureNotSet(conf, "mapred.output.format.class", mode);
- ensureNotSet(conf, oldReduceClass, mode);
- } else {
- String mode = "reduce compatability";
- ensureNotSet(conf, MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, mode);
- ensureNotSet(conf, MRJobConfig.REDUCE_CLASS_ATTR, mode);
- }
- }
- }
-
- private static void ensureNotSet(Configuration conf, String attr, String msg)
- throws IOException {
- if (conf.get(attr) != null) {
- throw new IOException(attr + " is incompatible with " + msg + " mode.");
- }
- }
-
- @LimitedPrivate("Hive, Pig")
- @Unstable
- public static UserPayload createUserPayloadFromConf(Configuration conf)
- throws IOException {
- return TezUtils.createUserPayloadFromConf(conf);
- }
-
- @LimitedPrivate("Hive, Pig")
- public static ByteString createByteStringFromConf(Configuration conf)
- throws IOException {
- return TezUtils.createByteStringFromConf(conf);
- }
-
- @LimitedPrivate("Hive, Pig")
- @Unstable
- public static Configuration createConfFromUserPayload(UserPayload payload)
- throws IOException {
- return TezUtils.createConfFromUserPayload(payload);
- }
-
- @LimitedPrivate("Hive, Pig")
- public static Configuration createConfFromByteString(ByteString bs)
- throws IOException {
- return TezUtils.createConfFromByteString(bs);
- }
-
- /**
- * Extract the map task's container resource requirements from the
- * provided configuration.
- *
- * Uses mapreduce.map.memory.mb and mapreduce.map.cpu.vcores from the
- * provided configuration.
+ * Extract the container resource requirements from the provided configuration, which would
+ * otherwise have been used when running a Hadoop MapReduce mapper. </p>
+ * <p/>
+ * This is only meant to be used if frameworks are not setting up their own {@link
+ * org.apache.hadoop.yarn.api.records.Resource} and would like to fallback to using resources
+ * which may already be configured for Hadoop MapReduce mappers.
*
* @param conf Configuration with MR specific settings used to extract
* information from
@@ -376,7 +305,7 @@ public class MRHelpers {
* @return Resource object used to define requirements for containers
* running Map tasks
*/
- public static Resource getMapResource(Configuration conf) {
+ public static Resource getResourceForMRMapper(Configuration conf) {
return Resource.newInstance(conf.getInt(
MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB),
conf.getInt(MRJobConfig.MAP_CPU_VCORES,
@@ -384,33 +313,38 @@ public class MRHelpers {
}
/**
- * Extract the reduce task's container resource requirements from the
- * provided configuration.
- *
+ * Extract the container resource requirements from the provided configuration, which would
+ * otherwise have been used when running a Hadoop MapReduce reducer. </p>
+ * <p/>
+ * This is only meant to be used if frameworks are not setting up their own {@link
+ * org.apache.hadoop.yarn.api.records.Resource} and would like to fallback to using resources
+ * which may already be configured for Hadoop MapReduce reducers.
+ * <p/>
* Uses mapreduce.reduce.memory.mb and mapreduce.reduce.cpu.vcores from the
* provided configuration.
*
* @param conf Configuration with MR specific settings used to extract
- * information from
- *
+ * information from
* @return Resource object used to define requirements for containers
* running Reduce tasks
*/
- public static Resource getReduceResource(Configuration conf) {
+ public static Resource getResourceForMRReducer(Configuration conf) {
return Resource.newInstance(conf.getInt(
- MRJobConfig.REDUCE_MEMORY_MB, MRJobConfig.DEFAULT_REDUCE_MEMORY_MB),
+ MRJobConfig.REDUCE_MEMORY_MB, MRJobConfig.DEFAULT_REDUCE_MEMORY_MB),
conf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
MRJobConfig.DEFAULT_REDUCE_CPU_VCORES));
}
/**
- * Setup classpath and other environment variables
- * @param conf Configuration to retrieve settings from
+ * Setup classpath and other environment variables based on the configured values for MR Mappers
+ * or Reducers
+ *
+ * @param conf Configuration to retrieve settings from
* @param environment Environment to update
- * @param isMap Whether task is a map or reduce task
+ * @param isMap Whether task is a map or reduce task
*/
- public static void updateEnvironmentForMRTasks(Configuration conf,
- Map<String, String> environment, boolean isMap) {
+ public static void updateEnvBasedOnMRTaskEnv(Configuration conf,
+ Map<String, String> environment, boolean isMap) {
// Shell
environment.put(Environment.SHELL.name(), conf.get(
MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL));
@@ -437,58 +371,16 @@ public class MRHelpers {
getChildLogLevel(conf, isMap) + ",CLA");
}
- private static Configuration getBaseJobConf(Configuration conf) {
- if (conf != null) {
- return new JobConf(conf);
- } else {
- return new JobConf();
- }
- }
-
- /**
- * Get default initialize JobConf-based configuration
- * @param conf Conf to initialize JobConf with.
- * @return Base configuration for MR-based jobs
- */
- public static Configuration getBaseMRConfiguration(Configuration conf) {
- return getBaseJobConf(conf);
- }
-
/**
- * Get default initialize JobConf-based configuration
- * @return Base configuration for MR-based jobs
- */
- static Configuration getBaseMRConfiguration() {
- return getBaseJobConf(null);
- }
-
- /**
- * Setup environment for the AM based on MR-based configuration
+ * Setup environment variables based on the configured values for the MR AM
* @param conf Configuration from which to extract information
* @param environment Environment map to update
*/
- public static void updateEnvironmentForMRAM(Configuration conf, Map<String, String> environment) {
+ public static void updateEnvBasedOnMRAMEnv(Configuration conf, Map<String, String> environment) {
TezYARNUtils.appendToEnvFromInputString(environment, conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV),
File.pathSeparator);
TezYARNUtils.appendToEnvFromInputString(environment, conf.get(MRJobConfig.MR_AM_ENV),
File.pathSeparator);
}
- /**
- * Extract Java Opts for the AM based on MR-based configuration
- * @param conf Configuration from which to extract information
- * @return Java opts for the AM
- */
- public static String getMRAMJavaOpts(Configuration conf) {
- // Admin opts
- String mrAppMasterAdminOptions = conf.get(
- MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
- MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
- // Add AM user command opts
- String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
- MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
-
- return mrAppMasterAdminOptions.trim()
- + " " + mrAppMasterUserOptions.trim();
- }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/0d5fdf3d/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 6b8ed83..0143df9 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -206,8 +206,8 @@ public class MRInput extends MRInputBase {
} catch (Exception e) {
throw new TezUncheckedException(e);
}
- MRHelpers.translateVertexConfToTez(inputConf);
- MRHelpers.doJobClientMagic(inputConf);
+ MRHelpers.translateMRConfToTez(inputConf);
+ MRHelpers.configureMRApiUsage(inputConf);
UserPayload payload = MRInputHelpersInternal.createMRInputPayload(inputConf, inputSplitInfo.getSplitsProto());
Credentials credentials = null;
if (getCredentialsForSourceFilesystem && inputSplitInfo.getCredentials() != null) {
@@ -224,8 +224,8 @@ public class MRInput extends MRInputBase {
Configuration inputConf = new JobConf(conf);
setupBasicConf(inputConf);
- MRHelpers.translateVertexConfToTez(inputConf);
- MRHelpers.doJobClientMagic(inputConf);
+ MRHelpers.translateMRConfToTez(inputConf);
+ MRHelpers.configureMRApiUsage(inputConf);
Credentials credentials = maybeGetCredentials();
@@ -243,8 +243,8 @@ public class MRInput extends MRInputBase {
private DataSourceDescriptor createGeneratorDataSource() throws IOException {
Configuration inputConf = new JobConf(conf);
setupBasicConf(inputConf);
- MRHelpers.translateVertexConfToTez(inputConf);
- MRHelpers.doJobClientMagic(inputConf);
+ MRHelpers.translateMRConfToTez(inputConf);
+ MRHelpers.configureMRApiUsage(inputConf);
Credentials credentials = maybeGetCredentials();