You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2022/11/26 08:04:20 UTC
[tez] branch master updated: TEZ-4396 Ensure utility classes have only private (default) constructors + several code refactors (#197) (Gergely Hanko reviewed by Laszlo Bodor)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 2fd7df4cf TEZ-4396 Ensure utility classes have only private (default) constructors + several code refactors (#197) (Gergely Hanko reviewed by Laszlo Bodor)
2fd7df4cf is described below
commit 2fd7df4cf2b22c5ed1f34c5dc1c865fc75e3d832
Author: ghanko <54...@users.noreply.github.com>
AuthorDate: Sat Nov 26 09:04:15 2022 +0100
TEZ-4396 Ensure utility classes have only private (default) constructors + several code refactors (#197) (Gergely Hanko reviewed by Laszlo Bodor)
---
.../java/org/apache/tez/client/TezClientUtils.java | 4 +-
.../java/org/apache/tez/common/ATSConstants.java | 1 +
.../main/java/org/apache/tez/common/RPCUtil.java | 25 +--
.../org/apache/tez/common/ReflectionUtils.java | 6 +-
.../java/org/apache/tez/common/TezCommonUtils.java | 21 +-
.../main/java/org/apache/tez/common/TezUtils.java | 24 +--
.../java/org/apache/tez/common/TezYARNUtils.java | 14 +-
.../org/apache/tez/common/security/Master.java | 6 +-
.../org/apache/tez/common/security/TokenCache.java | 23 +--
.../org/apache/tez/dag/api/DagTypeConverters.java | 4 +-
.../tez/dag/api/TezConfigurationConstants.java | 4 +-
.../java/org/apache/tez/dag/api/TezConstants.java | 3 +-
.../tez/dag/api/client/TimelineReaderFactory.java | 38 ++--
.../java/org/apache/tez/client/TestTezClient.java | 2 +-
.../java/org/apache/tez/common/TezTestUtils.java | 6 +-
.../org/apache/tez/common/TezUtilsInternal.java | 33 ++-
.../apache/tez/dag/utils/RelocalizationUtils.java | 7 +-
tez-dag/src/main/java/org/apache/tez/Utils.java | 6 +-
.../dag/app/dag/impl/TaskAttemptImplHelpers.java | 4 +-
.../dag/app/rm/container/AMContainerHelpers.java | 53 +++--
.../logging/impl/HistoryEventJsonConversion.java | 8 +-
.../org/apache/tez/dag/history/utils/DAGUtils.java | 4 +-
.../tez/dag/history/utils/TezEventUtils.java | 4 +-
.../java/org/apache/tez/dag/utils/ProtoUtils.java | 4 +-
.../org/apache/tez/dag/utils/TezBuilderUtils.java | 4 +-
.../apache/tez/dag/utils/TezRuntimeChildJVM.java | 14 +-
.../tez/dag/app/PluginWrapperTestHelpers.java | 21 +-
.../tez/dag/app/rm/TestTaskSchedulerHelpers.java | 27 +--
.../org/apache/tez/examples/ExampleDriver.java | 18 +-
.../tez/service/TezTestServiceConfConstants.java | 3 +-
.../java/org/apache/tez/util/ProtoConverters.java | 4 +-
.../mapreduce/split/SplitMetaInfoReaderTez.java | 7 +-
.../org/apache/tez/common/MRFrameworkConfigs.java | 4 +-
.../org/apache/tez/mapreduce/common/Utils.java | 6 +-
.../tez/mapreduce/hadoop/DeprecatedKeys.java | 44 ++--
.../apache/tez/mapreduce/hadoop/IDConverter.java | 4 +-
.../org/apache/tez/mapreduce/hadoop/MRHelpers.java | 13 +-
.../tez/mapreduce/hadoop/MRInputHelpers.java | 98 +++------
.../hadoop/MultiStageMRConfToTezTranslator.java | 12 +-
.../mapreduce/hadoop/MultiStageMRConfigUtil.java | 13 +-
.../tez/mapreduce/hadoop/TezTypeConverters.java | 10 +-
.../org/apache/tez/mapreduce/input/MRInput.java | 18 +-
.../org/apache/tez/mapreduce/lib/MRInputUtils.java | 19 +-
.../org/apache/tez/mapreduce/TezTestUtils.java | 7 +-
.../apache/tez/mapreduce/processor/MapUtils.java | 27 +--
.../org/apache/tez/history/parser/utils/Utils.java | 3 +-
.../ats/HistoryEventTimelineConversion.java | 4 +-
.../org/apache/tez/common/ProtoConverters.java | 3 +-
.../org/apache/tez/common/TezConverterUtils.java | 5 +-
.../tez/runtime/task/TaskExecutionTestHelpers.java | 29 +--
.../tez/runtime/task/TestContainerExecution.java | 2 +-
.../tez/common/TezRuntimeFrameworkConfigs.java | 4 +-
.../library/api/TezRuntimeConfiguration.java | 223 +++++++++++----------
.../tez/runtime/library/common/ConfigUtils.java | 22 +-
.../tez/runtime/library/common/Constants.java | 3 +-
.../runtime/library/common/TezRuntimeUtils.java | 74 +++----
.../common/security/SecureShuffleUtils.java | 38 ++--
.../library/common/shuffle/ShuffleUtils.java | 4 +-
.../runtime/library/common/sort/impl/IFile.java | 80 ++------
.../library/common/sort/impl/TezMerger.java | 68 +++----
.../org/apache/tez/runtime/library/conf/Utils.java | 4 +-
.../tez/runtime/library/utils/BufferUtils.java | 5 +-
.../runtime/library/utils/FastByteComparisons.java | 12 +-
.../library/common/sort/impl/TestTezMerger.java | 117 +++++------
.../runtime/library/output/OutputTestHelpers.java | 11 +-
.../tez/runtime/library/testutils/KVDataGen.java | 12 +-
.../library/testutils/RandomTextGenerator.java | 10 +-
.../tez/mapreduce/examples/ExampleDriver.java | 10 +-
.../java/org/apache/tez/test/SimpleTestDAG.java | 8 +-
.../apache/tez/test/SimpleTestDAG3Vertices.java | 4 +-
.../test/java/org/apache/tez/test/TestDriver.java | 6 +-
.../org/apache/tez/test/TestFaultTolerance.java | 26 +--
.../org/apache/tez/test/dag/FailingDagBuilder.java | 166 +++++++++++++++
.../org/apache/tez/test/dag/MultiAttemptDAG.java | 48 ++---
.../apache/tez/test/dag/SimpleReverseVTestDAG.java | 4 +-
.../org/apache/tez/test/dag/SimpleVTestDAG.java | 4 +-
.../apache/tez/test/dag/SixLevelsFailingDAG.java | 95 ---------
.../apache/tez/test/dag/ThreeLevelsFailingDAG.java | 75 -------
.../apache/tez/test/dag/TwoLevelsFailingDAG.java | 113 -----------
.../tez/analyzer/plugins/AnalyzerDriver.java | 4 +-
.../java/org/apache/tez/analyzer/utils/Utils.java | 4 +-
.../tools/javadoc/doclet/ConfigStandardDoclet.java | 12 +-
82 files changed, 865 insertions(+), 1124 deletions(-)
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index d78ccf3e8..1225181fe 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -116,11 +116,13 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@Private
-public class TezClientUtils {
+public final class TezClientUtils {
private static Logger LOG = LoggerFactory.getLogger(TezClientUtils.class);
private static final int UTF8_CHUNK_SIZE = 16 * 1024;
+ private TezClientUtils() {}
+
private static FileStatus[] getLRFileStatus(String fileName, Configuration conf) throws
IOException {
URI uri;
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 47d536fdd..e3c90d315 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -138,4 +138,5 @@ public class ATSConstants {
public static final String CALLER_TYPE = "callerType";
public static final String DESCRIPTION = "description";
+ protected ATSConstants() {}
}
diff --git a/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java b/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java
index caeb822bc..232ad6a7c 100644
--- a/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java
+++ b/tez-api/src/main/java/org/apache/tez/common/RPCUtil.java
@@ -29,7 +29,9 @@ import org.apache.tez.dag.api.TezException;
import com.google.protobuf.ServiceException;
-public class RPCUtil {
+public final class RPCUtil {
+
+ private RPCUtil() {}
/**
* Returns an instance of {@link TezException}
@@ -55,17 +57,8 @@ public class RPCUtil {
return ex;
// RemoteException contains useful information as against the
// java.lang.reflect exceptions.
- } catch (NoSuchMethodException e) {
- throw re;
- } catch (IllegalArgumentException e) {
- throw re;
- } catch (SecurityException e) {
- throw re;
- } catch (InstantiationException e) {
- throw re;
- } catch (IllegalAccessException e) {
- throw re;
- } catch (InvocationTargetException e) {
+ } catch (NoSuchMethodException | IllegalArgumentException | SecurityException | InstantiationException
+ | IllegalAccessException | InvocationTargetException e) {
throw re;
}
}
@@ -85,12 +78,6 @@ public class RPCUtil {
return instantiateException(cls, re);
}
- private static <T extends SessionNotRunning> T instantiateSessionNotRunningException(
- Class<? extends T> cls, RemoteException re) throws RemoteException {
- return instantiateException(cls, re);
- }
-
-
/**
* Utility method that unwraps and returns appropriate exceptions.
*
@@ -109,7 +96,7 @@ public class RPCUtil {
} else {
if (cause instanceof RemoteException) {
RemoteException re = (RemoteException) cause;
- Class<?> realClass = null;
+ Class<?> realClass;
try {
realClass = Class.forName(re.getClassName());
} catch (ClassNotFoundException cnf) {
diff --git a/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java b/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
index 73becdaa6..5bfb41c58 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ReflectionUtils.java
@@ -29,9 +29,11 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.tez.dag.api.TezReflectionException;
@Private
-public class ReflectionUtils {
+public final class ReflectionUtils {
- private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<String, Class<?>>();
+ private static final Map<String, Class<?>> CLAZZ_CACHE = new ConcurrentHashMap<>();
+
+ private ReflectionUtils() {}
@Private
public static Class<?> getClazz(String className) throws TezReflectionException {
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
index 5c2876444..ca5fcdf3e 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezCommonUtils.java
@@ -55,7 +55,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
import com.google.protobuf.ByteString;
@Private
-public class TezCommonUtils {
+public final class TezCommonUtils {
public static final FsPermission TEZ_AM_DIR_PERMISSION = FsPermission
.createImmutable((short) 0700); // rwx--------
public static final FsPermission TEZ_AM_FILE_PERMISSION = FsPermission
@@ -64,6 +64,8 @@ public class TezCommonUtils {
public static final String TEZ_SYSTEM_SUB_DIR = ".tez";
+ private TezCommonUtils() {}
+
/**
* <p>
* This function returns the staging directory defined in the config with
@@ -222,7 +224,6 @@ public class TezCommonUtils {
* @param conf
* Tez configuration
* @return App recovery path
- * @throws IOException
*/
@Private
public static Path getRecoveryPath(Path tezSysStagingPath, Configuration conf)
@@ -288,7 +289,6 @@ public class TezCommonUtils {
* Filesystem
* @param dir
* directory to be created
- * @throws IOException
*/
public static void mkDirForAM(FileSystem fs, Path dir) throws IOException {
FsPermission perm = new FsPermission(TEZ_AM_DIR_PERMISSION);
@@ -312,7 +312,6 @@ public class TezCommonUtils {
* @param filePath
* file path to create the file
* @return FSDataOutputStream
- * @throws IOException
*/
public static FSDataOutputStream createFileForAM(FileSystem fs, Path filePath) throws IOException {
return FileSystem.create(fs, filePath, new FsPermission(TEZ_AM_FILE_PERMISSION));
@@ -417,7 +416,7 @@ public class TezCommonUtils {
}
StringBuilder sb = new StringBuilder();
- sb.append("Credentials: #" + identifier + "Tokens=").append(credentials.numberOfTokens());
+ sb.append("Credentials: #").append(identifier).append("Tokens=").append(credentials.numberOfTokens());
if (credentials.numberOfTokens() > 0) {
sb.append(", Services=");
sb.append(credentials.getAllTokens().stream()
@@ -435,16 +434,14 @@ public class TezCommonUtils {
Token<JobTokenIdentifier> jobToken) throws IOException {
DataOutputBuffer dob = new DataOutputBuffer();
jobToken.write(dob);
- ByteBuffer bb = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
- return bb;
+ return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
public static Credentials parseCredentialsBytes(byte[] credentialsBytes) throws IOException {
Credentials credentials = new Credentials();
DataInputBuffer dib = new DataInputBuffer();
try {
- byte[] tokenBytes = credentialsBytes;
- dib.reset(tokenBytes, tokenBytes.length);
+ dib.reset(credentialsBytes, credentialsBytes.length);
credentials.readTokenStorageStream(dib);
return credentials;
} finally {
@@ -459,7 +456,7 @@ public class TezCommonUtils {
}
public static Collection<String> tokenizeString(String str, String delim) {
- List<String> values = new ArrayList<String>();
+ List<String> values = new ArrayList<>();
if (str == null || str.isEmpty())
return values;
StringTokenizer tokenizer = new StringTokenizer(str, delim);
@@ -533,7 +530,7 @@ public class TezCommonUtils {
if (val > 0 && val < TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM) {
return TezConstants.TEZ_AM_CLIENT_HEARTBEAT_TIMEOUT_SECS_MINIMUM * 1000;
}
- return val * 1000;
+ return val * 1000L;
}
/**
@@ -570,7 +567,7 @@ public class TezCommonUtils {
if (timeoutSecs == 0) {
timeoutSecs = 1;
}
- return 1000l * timeoutSecs;
+ return 1000L * timeoutSecs;
}
public static int getJavaVersion() {
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
index 1c0be98dc..88920a4e5 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezUtils.java
@@ -21,7 +21,6 @@ package org.apache.tez.common;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -49,10 +48,12 @@ import org.xerial.snappy.SnappyOutputStream;
* {@link org.apache.hadoop.conf.Configuration} to {@link org.apache.tez.dag.api.UserPayload} etc.
*/
@InterfaceAudience.Public
-public class TezUtils {
+public final class TezUtils {
private static final Logger LOG = LoggerFactory.getLogger(TezUtils.class);
+ private TezUtils() {}
+
/**
* Allows changing the log level for task / AM logging. </p>
*
@@ -73,18 +74,12 @@ public class TezUtils {
* @param conf
* : Configuration to be converted
* @return PB ByteString (compressed)
- * @throws java.io.IOException
*/
public static ByteString createByteStringFromConf(Configuration conf) throws IOException {
Objects.requireNonNull(conf, "Configuration must be specified");
ByteString.Output os = ByteString.newOutput();
- SnappyOutputStream compressOs = new SnappyOutputStream(os);
- try {
+ try (SnappyOutputStream compressOs = new SnappyOutputStream(os)) {
writeConfInPB(compressOs, conf);
- } finally {
- if (compressOs != null) {
- compressOs.close();
- }
}
return os.toByteString();
}
@@ -95,7 +90,6 @@ public class TezUtils {
*
* @param conf configuration to be converted
* @return an instance of {@link org.apache.tez.dag.api.UserPayload}
- * @throws java.io.IOException
*/
public static UserPayload createUserPayloadFromConf(Configuration conf) throws IOException {
return UserPayload.create(ByteBuffer.wrap(createByteStringFromConf(conf).toByteArray()));
@@ -113,11 +107,10 @@ public class TezUtils {
* @param byteString byteString representation of the conf created using {@link
* #createByteStringFromConf(org.apache.hadoop.conf.Configuration)}
* @return Configuration
- * @throws java.io.IOException
*/
public static Configuration createConfFromByteString(ByteString byteString) throws IOException {
Objects.requireNonNull(byteString, "ByteString must be specified");
- try(SnappyInputStream uncompressIs = new SnappyInputStream(byteString.newInput());) {
+ try(SnappyInputStream uncompressIs = new SnappyInputStream(byteString.newInput())) {
DAGProtos.ConfigurationProto confProto = createConfProto(uncompressIs);
Configuration conf = new Configuration(false);
readConfFromPB(confProto, conf);
@@ -156,7 +149,6 @@ public class TezUtils {
* @param payload {@link org.apache.tez.dag.api.UserPayload} created using {@link
* #createUserPayloadFromConf(org.apache.hadoop.conf.Configuration)}
* @return Configuration
- * @throws java.io.IOException
*/
public static Configuration createConfFromUserPayload(UserPayload payload) throws IOException {
return createConfFromByteString(ByteString.copyFrom(payload.getPayload()));
@@ -186,12 +178,10 @@ public class TezUtils {
}
if (conf != null) {
JSONObject confJson = new JSONObject();
- Iterator<Entry<String, String>> iter = conf.iterator();
- while (iter.hasNext()) {
- Entry<String, String> entry = iter.next();
+ for (Entry<String, String> entry : conf) {
String key = entry.getKey();
String val = conf.get(entry.getKey());
- if(val != null) {
+ if (val != null) {
confJson.put(key, val);
} else {
LOG.debug("null value in Configuration after replacement for key={}. Skipping.", key);
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezYARNUtils.java b/tez-api/src/main/java/org/apache/tez/common/TezYARNUtils.java
index bd6de11a5..1e5d4bdf4 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezYARNUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezYARNUtils.java
@@ -35,7 +35,7 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
@Private
-public class TezYARNUtils {
+public final class TezYARNUtils {
private static Logger LOG = LoggerFactory.getLogger(TezYARNUtils.class);
public static final String ENV_NAME_REGEX = "[A-Za-z_][A-Za-z0-9_]*";
@@ -49,6 +49,8 @@ public class TezYARNUtils {
+ "([^,]*)" // val group
);
+ private TezYARNUtils() {}
+
public static String getFrameworkClasspath(Configuration conf, boolean usingArchive) {
StringBuilder classpathBuilder = new StringBuilder();
boolean userClassesTakesPrecedence =
@@ -126,9 +128,11 @@ public class TezYARNUtils {
// Add PWD:PWD/*
classpathBuilder.append(Environment.PWD.$())
- .append(File.pathSeparator)
- .append(Environment.PWD.$() + File.separator + "*")
- .append(File.pathSeparator);
+ .append(File.pathSeparator)
+ .append(Environment.PWD.$())
+ .append(File.separator)
+ .append("*")
+ .append(File.pathSeparator);
}
public static void appendToEnvFromInputString(Map<String, String> env,
@@ -161,7 +165,7 @@ public class TezYARNUtils {
public static void setEnvIfAbsentFromInputString(Map<String, String> env,
String envString) {
if (envString != null && envString.length() > 0) {
- String childEnvs[] = envString.split(",");
+ String[] childEnvs = envString.split(",");
for (String cEnv : childEnvs) {
String[] parts = cEnv.split("="); // split on '='
Matcher m = VAR_SUBBER .matcher(parts[1]);
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/Master.java b/tez-api/src/main/java/org/apache/tez/common/security/Master.java
index de73d1009..d0b8d16c4 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/Master.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/Master.java
@@ -29,12 +29,14 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
@Private
@Unstable
-public class Master {
+public final class Master {
public enum State {
- INITIALIZING, RUNNING;
+ INITIALIZING, RUNNING
}
+ private Master() {}
+
public static String getMasterUserName(Configuration conf) {
return conf.get(YarnConfiguration.RM_PRINCIPAL);
}
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java b/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java
index e56ef6127..21b1026d3 100644
--- a/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java
+++ b/tez-api/src/main/java/org/apache/tez/common/security/TokenCache.java
@@ -45,14 +45,16 @@ import org.apache.tez.dag.api.TezConfiguration;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class TokenCache {
+public final class TokenCache {
private static final Logger LOG = LoggerFactory.getLogger(TokenCache.class);
+ private TokenCache() {}
+
/**
* auxiliary method to get user's secret keys..
- * @param alias
+ *
* @return secret key from the storage
*/
public static byte[] getSecretKey(Credentials credentials, Text alias) {
@@ -64,10 +66,9 @@ public class TokenCache {
/**
* Convenience method to obtain delegation tokens from namenodes
* corresponding to the paths passed.
- * @param credentials
+ * @param credentials credentials
* @param ps array of paths
* @param conf configuration
- * @throws IOException
*/
public static void obtainTokensForFileSystems(Credentials credentials,
Path[] ps, Configuration conf) throws IOException {
@@ -80,7 +81,7 @@ public class TokenCache {
private static final int MAX_FS_OBJECTS = 10;
static void obtainTokensForFileSystemsInternal(Credentials credentials,
Path[] ps, Configuration conf) throws IOException {
- Set<FileSystem> fsSet = new HashSet<FileSystem>();
+ Set<FileSystem> fsSet = new HashSet<>();
boolean limitExceeded = false;
for(Path p: ps) {
FileSystem fs = p.getFileSystem(conf);
@@ -107,8 +108,8 @@ public class TokenCache {
conf.getStrings(TezConfiguration.TEZ_JOB_FS_SERVERS_TOKEN_RENEWAL_EXCLUDE);
if (nns != null) {
String host = fs.getUri().getHost();
- for(int i = 0; i < nns.length; i++) {
- if (nns[i].equals(host)) {
+ for (String nn : nns) {
+ if (nn.equals(host)) {
return true;
}
}
@@ -118,11 +119,6 @@ public class TokenCache {
/**
* get delegation token for a specific FS
- * @param fs
- * @param credentials
- * @param p
- * @param conf
- * @throws IOException
*/
static void obtainTokensForFileSystemsInternal(FileSystem fs,
Credentials credentials, Configuration conf) throws IOException {
@@ -137,7 +133,7 @@ public class TokenCache {
}
}
- final Token<?> tokens[] = fs.addDelegationTokens(delegTokenRenewer,
+ final Token<?>[] tokens = fs.addDelegationTokens(delegTokenRenewer,
credentials);
if (tokens != null) {
for (Token<?> token : tokens) {
@@ -150,7 +146,6 @@ public class TokenCache {
/**
* store session specific token
- * @param t
*/
@InterfaceAudience.Private
public static void setSessionToken(Token<? extends TokenIdentifier> t,
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
index c563f1fb4..a55e45a7f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DagTypeConverters.java
@@ -90,7 +90,9 @@ import com.google.protobuf.ByteString.Output;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
@Private
-public class DagTypeConverters {
+public final class DagTypeConverters {
+
+ private DagTypeConverters() {}
public static PlanLocalResourceVisibility convertToDAGPlan(LocalResourceVisibility visibility){
switch(visibility){
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfigurationConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfigurationConstants.java
index 33abc77de..4bd5e254f 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfigurationConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfigurationConstants.java
@@ -27,12 +27,14 @@ import org.apache.tez.common.annotation.ConfigurationProperty;
*/
@ConfigurationClass(templateFileName = "tez-conf-constants.xml")
@Private
-public class TezConfigurationConstants {
+public final class TezConfigurationConstants {
static {
TezConfiguration.setupConfigurationScope(TezConfigurationConstants.class);
}
+ private TezConfigurationConstants() {}
+
/**
* String value. Set automatically by the client. The host name of the client the Tez application
* was submitted from.
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
index cf5ab11ec..379eb0cb1 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConstants.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
* Specifies all constant values in Tez
*/
@Private
-public class TezConstants {
+public final class TezConstants {
public static final String TEZ_APPLICATION_MASTER_CLASS =
@@ -129,4 +129,5 @@ public class TezConstants {
public static final double TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_SMALL_SLAB = 0.7;
public static final double TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION_LARGE_SLAB = 0.8;
+ private TezConstants() {}
}
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
index fc1595fa4..2d0476a72 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/TimelineReaderFactory.java
@@ -61,7 +61,7 @@ import org.slf4j.LoggerFactory;
*
*/
@InterfaceAudience.Private
-public class TimelineReaderFactory {
+public final class TimelineReaderFactory {
private static final Logger LOG = LoggerFactory.getLogger(TimelineReaderFactory.class);
@@ -79,6 +79,8 @@ public class TimelineReaderFactory {
private static Class<?> delegationTokenAuthenticatorClazz = null;
private static Method delegationTokenAuthenticateURLOpenConnectionMethod = null;
+ private TimelineReaderFactory() {}
+
public static TimelineReaderStrategy getTimelineReaderStrategy(Configuration conf,
boolean useHttps,
int connTimeout) throws TezException {
@@ -140,17 +142,14 @@ public class TimelineReaderFactory {
* auth strategy for secured and unsecured environment with delegation token (hadoop 2.6 and above)
*/
private static class TimelineReaderTokenAuthenticatedStrategy implements TimelineReaderStrategy {
- private final Configuration conf;
private final boolean useHttps;
private final int connTimeout;
- private ConnectionConfigurator connectionConfigurator;
- private SSLFactory sslFactory;
+ private final SSLFactory sslFactory;
public TimelineReaderTokenAuthenticatedStrategy(final Configuration conf,
final boolean useHttps,
final int connTimeout) {
- this.conf = conf;
this.useHttps = useHttps;
this.connTimeout = connTimeout;
this.sslFactory = useHttps ? new SSLFactory(CLIENT, conf) : null;
@@ -161,11 +160,10 @@ public class TimelineReaderFactory {
Authenticator authenticator;
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation realUgi = ugi.getRealUser();
- UserGroupInformation authUgi;
String doAsUser;
ClientConfig clientConfig = new DefaultClientConfig(JSONRootElementProvider.App.class);
- connectionConfigurator = getNewConnectionConf(conf, useHttps,
- connTimeout, sslFactory);
+ ConnectionConfigurator connectionConfigurator = getNewConnectionConf(useHttps,
+ connTimeout, sslFactory);
try {
authenticator = getTokenAuthenticator();
@@ -175,17 +173,15 @@ public class TimelineReaderFactory {
}
if (realUgi != null) {
- authUgi = realUgi;
doAsUser = ugi.getShortUserName();
} else {
- authUgi = ugi;
doAsUser = null;
}
HttpURLConnectionFactory connectionFactory;
try {
connectionFactory = new TokenAuthenticatedURLConnectionFactory(connectionConfigurator, authenticator,
- authUgi, doAsUser);
+ doAsUser);
} catch (TezException e) {
throw new IOException("Fail to create TokenAuthenticatedURLConnectionFactory", e);
}
@@ -208,17 +204,14 @@ public class TimelineReaderFactory {
private final Authenticator authenticator;
private final ConnectionConfigurator connConfigurator;
- private final UserGroupInformation authUgi;
private final String doAsUser;
private final AuthenticatedURL.Token token;
public TokenAuthenticatedURLConnectionFactory(ConnectionConfigurator connConfigurator,
Authenticator authenticator,
- UserGroupInformation authUgi,
String doAsUser) throws TezException {
this.connConfigurator = connConfigurator;
this.authenticator = authenticator;
- this.authUgi = authUgi;
this.doAsUser = doAsUser;
this.token = ReflectionUtils.createClazzInstance(
DELEGATION_TOKEN_AUTHENTICATED_URL_TOKEN_CLASS_NAME, null, null);
@@ -264,15 +257,14 @@ public class TimelineReaderFactory {
final boolean useHttps,
final int connTimeout) {
sslFactory = useHttps ? new SSLFactory(CLIENT, conf) : null;
- connectionConf = getNewConnectionConf(conf, useHttps, connTimeout, sslFactory);
+ connectionConf = getNewConnectionConf(useHttps, connTimeout, sslFactory);
}
@Override
public Client getHttpClient() {
ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class);
HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory(connectionConf);
- Client httpClient = new Client(new URLConnectionClientHandler(urlFactory), config);
- return httpClient;
+ return new Client(new URLConnectionClientHandler(urlFactory), config);
}
@VisibleForTesting
@@ -289,7 +281,7 @@ public class TimelineReaderFactory {
URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8");
HttpURLConnection httpURLConnection =
- (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection();
+ (HttpURLConnection) (new URL(url + tokenString)).openConnection();
this.connectionConf.configure(httpURLConnection);
return httpURLConnection;
@@ -304,14 +296,13 @@ public class TimelineReaderFactory {
}
}
- private static ConnectionConfigurator getNewConnectionConf(final Configuration conf,
- final boolean useHttps,
+ private static ConnectionConfigurator getNewConnectionConf(final boolean useHttps,
final int connTimeout,
final SSLFactory sslFactory) {
ConnectionConfigurator connectionConf = null;
if (useHttps) {
try {
- connectionConf = getNewSSLConnectionConf(conf, connTimeout, sslFactory);
+ connectionConf = getNewSSLConnectionConf(connTimeout, sslFactory);
} catch (IOException e) {
LOG.debug("Cannot load customized ssl related configuration."
+ " Falling back to system-generic settings.", e);
@@ -321,7 +312,7 @@ public class TimelineReaderFactory {
if (connectionConf == null) {
connectionConf = new ConnectionConfigurator() {
@Override
- public HttpURLConnection configure(HttpURLConnection httpURLConnection) throws IOException {
+ public HttpURLConnection configure(HttpURLConnection httpURLConnection) {
setTimeouts(httpURLConnection, connTimeout);
return httpURLConnection;
}
@@ -331,8 +322,7 @@ public class TimelineReaderFactory {
return connectionConf;
}
- private static ConnectionConfigurator getNewSSLConnectionConf(final Configuration conf,
- final int connTimeout,
+ private static ConnectionConfigurator getNewSSLConnectionConf(final int connTimeout,
final SSLFactory sslFactory)
throws IOException {
final SSLSocketFactory sslSocketFactory;
diff --git a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
index 4f33348f8..728076cbe 100644
--- a/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
+++ b/tez-api/src/test/java/org/apache/tez/client/TestTezClient.java
@@ -522,7 +522,7 @@ public class TestTezClient {
assertTrue("Time taken is not as expected",
(endTime - startTime) > timeout);
verify(spyClient, times(0)).submitDAG(any());
- Assert.assertTrue("Unexpected Exception message",
+ Assert.assertTrue("Unexpected Exception message: " + te.getMessage(),
te.getMessage().contains("Tez AM not ready"));
}
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezTestUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezTestUtils.java
index 4b9d5c321..1cbacbd13 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezTestUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezTestUtils.java
@@ -21,12 +21,14 @@ package org.apache.tez.common;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-public class TezTestUtils {
+public final class TezTestUtils {
+
+ private TezTestUtils() {}
+
/**
* Ensures a reasonably high limit for yarn disk utilization. This is very important for tests,
* as devs keep bumping into silent test hangs where yarn simply considers their machines as unhealthy,
* as the default limit is 90%, even if a machine with 90% full disk is still able to function.
- * @param conf
*/
public static void ensureHighDiskUtilizationLimit(Configuration conf) {
if (conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 7ee5bb457..bdfc6e02c 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -24,7 +24,7 @@ import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.BitSet;
import java.util.HashSet;
import java.util.List;
@@ -67,10 +67,12 @@ import org.slf4j.LoggerFactory;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
@Private
-public class TezUtilsInternal {
+public final class TezUtilsInternal {
private static final Logger LOG = LoggerFactory.getLogger(TezUtilsInternal.class);
+ private TezUtilsInternal() {}
+
public static ConfigurationProto readUserSpecifiedTezConfiguration(String baseDir) throws
IOException {
File confPBFile = new File(baseDir, TezConstants.TEZ_PB_BINARY_CONF_NAME);
@@ -88,7 +90,7 @@ public class TezUtilsInternal {
}
}
- public static byte[] compressBytes(byte[] inBytes) throws IOException {
+ public static byte[] compressBytes(byte[] inBytes) {
StopWatch sw = new StopWatch().start();
byte[] compressed = compressBytesInflateDeflate(inBytes);
sw.stop();
@@ -120,8 +122,7 @@ public class TezUtilsInternal {
int count = deflater.deflate(buffer);
bos.write(buffer, 0, count);
}
- byte[] output = bos.toByteArray();
- return output;
+ return bos.toByteArray();
}
private static byte[] uncompressBytesInflateDeflate(byte[] inBytes) throws IOException {
@@ -138,8 +139,7 @@ public class TezUtilsInternal {
}
bos.write(buffer, 0, count);
}
- byte[] output = bos.toByteArray();
- return output;
+ return bos.toByteArray();
}
private static final Pattern pattern = Pattern.compile("\\W");
@@ -154,8 +154,7 @@ public class TezUtilsInternal {
private static String sanitizeString(String srcString) {
Matcher matcher = pattern.matcher(srcString);
- String res = matcher.replaceAll("_");
- return res; // Number starts allowed rightnow
+ return matcher.replaceAll("_"); // Number starts allowed rightnow
}
public static void updateLoggers(Configuration configuration, String addend, String patternString)
@@ -168,15 +167,14 @@ public class TezUtilsInternal {
if (appender != null) {
if (appender instanceof TezContainerLogAppender) {
TezContainerLogAppender claAppender = (TezContainerLogAppender) appender;
- claAppender
- .setLogFileName(constructLogFileName(TezConstants.TEZ_CONTAINER_LOG_FILE_NAME, addend));
+ claAppender.setLogFileName(constructLogFileName(
+ addend));
// there was a configured pattern
if (patternString != null) {
PatternLayout layout = (PatternLayout) claAppender.getLayout();
layout.setConversionPattern(patternString);
}
-
claAppender.activateOptions();
} else {
LOG.warn("Appender is a " + appender.getClass() + "; require an instance of "
@@ -188,11 +186,11 @@ public class TezUtilsInternal {
}
}
- private static String constructLogFileName(String base, String addend) {
+ private static String constructLogFileName(String addend) {
if (addend == null || addend.isEmpty()) {
- return base;
+ return TezConstants.TEZ_CONTAINER_LOG_FILE_NAME;
} else {
- return base + "_" + addend;
+ return TezConstants.TEZ_CONTAINER_LOG_FILE_NAME + "_" + addend;
}
}
@@ -225,7 +223,6 @@ public class TezUtilsInternal {
/**
* Convert DAGPlan to text. Skip sensitive informations like credentials.
*
- * @param dagPlan
* @return a string representation of the dag plan with sensitive information removed
*/
public static String convertDagPlanToString(DAGProtos.DAGPlan dagPlan) throws IOException {
@@ -238,7 +235,7 @@ public class TezUtilsInternal {
DagTypeConverters.convertByteStringToCredentials(dagPlan.getCredentialsBinary());
TextFormat.printField(entry.getKey(),
ByteString.copyFrom(TezCommonUtils.getCredentialsInfo(credentials,"dag").getBytes(
- Charset.forName("UTF-8"))), sb);
+ StandardCharsets.UTF_8)), sb);
}
}
return sb.toString();
@@ -266,8 +263,6 @@ public class TezUtilsInternal {
return TaskAttemptTerminationCause.NODE_FAILED;
case CONTAINER_EXITED:
return TaskAttemptTerminationCause.CONTAINER_EXITED;
- case OTHER:
- return TaskAttemptTerminationCause.UNKNOWN_ERROR;
default:
return TaskAttemptTerminationCause.UNKNOWN_ERROR;
}
diff --git a/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java b/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
index 84a9474a3..9ccfc76be 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/utils/RelocalizationUtils.java
@@ -31,15 +31,16 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tez.common.ReflectionUtils;
-import org.apache.tez.dag.api.TezException;
import com.google.common.collect.Lists;
@InterfaceAudience.Private
-public class RelocalizationUtils {
+public final class RelocalizationUtils {
+
+ private RelocalizationUtils() {}
public static List<URL> processAdditionalResources(Map<String, URI> additionalResources,
- Configuration conf, String destDir) throws IOException, TezException {
+ Configuration conf, String destDir) throws IOException {
if (additionalResources == null || additionalResources.isEmpty()) {
return Collections.emptyList();
}
diff --git a/tez-dag/src/main/java/org/apache/tez/Utils.java b/tez-dag/src/main/java/org/apache/tez/Utils.java
index b352334f4..1f9fb6a65 100644
--- a/tez-dag/src/main/java/org/apache/tez/Utils.java
+++ b/tez-dag/src/main/java/org/apache/tez/Utils.java
@@ -37,10 +37,10 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
@InterfaceAudience.Private
-/**
+/*
* Utility class within the tez-dag module
*/
-public class Utils {
+public final class Utils {
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
@@ -49,6 +49,8 @@ public class Utils {
*/
private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+");
+ private Utils() {}
+
public static String getContainerLauncherIdentifierString(int launcherIndex, AppContext appContext) {
String name;
try {
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImplHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImplHelpers.java
index 9e4f2b4a6..fba82613d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImplHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImplHelpers.java
@@ -25,9 +25,11 @@ import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TaskAttemptImplHelpers {
+public final class TaskAttemptImplHelpers {
private static final Logger LOG = LoggerFactory.getLogger(TaskAttemptImplHelpers.class);
+
+ private TaskAttemptImplHelpers() {}
static String[] resolveHosts(String[] src) {
String[] result = new String[src.length];
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
index 19cf5b7e2..a0407c0a4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java
@@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.TreeMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +47,6 @@ import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.security.TokenCache;
import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.records.TezDAGID;
@@ -56,18 +54,20 @@ import org.apache.tez.dag.utils.TezRuntimeChildJVM;
import com.google.common.annotations.VisibleForTesting;
-public class AMContainerHelpers {
+public final class AMContainerHelpers {
private static final Logger LOG = LoggerFactory.getLogger(AMContainerHelpers.class);
- private static Object commonContainerSpecLock = new Object();
+ private static final Object COMMON_CONTAINER_SPEC_LOCK = new Object();
private static TezDAGID lastDAGID = null;
- private static Map<TezDAGID, ContainerLaunchContext> commonContainerSpecs =
- new HashMap<TezDAGID, ContainerLaunchContext>();
+ private static final Map<TezDAGID, ContainerLaunchContext> COMMON_CONTAINER_SPECS =
+ new HashMap<>();
+
+ private AMContainerHelpers() {}
public static void dagComplete(TezDAGID dagId) {
- synchronized (commonContainerSpecLock) {
- commonContainerSpecs.remove(dagId);
+ synchronized (COMMON_CONTAINER_SPEC_LOCK) {
+ COMMON_CONTAINER_SPECS.remove(dagId);
}
}
@@ -89,24 +89,21 @@ public class AMContainerHelpers {
/**
* Create the common {@link ContainerLaunchContext} for all attempts.
- *
- * @param applicationACLs
- * @param auxiliaryService
*/
private static ContainerLaunchContext createCommonContainerLaunchContext(
Map<ApplicationAccessType, String> applicationACLs,
Credentials credentials, String auxiliaryService) {
// Application environment
- Map<String, String> environment = new HashMap<String, String>();
+ Map<String, String> environment = new HashMap<>();
// Service data
- Map<String, ByteBuffer> serviceData = new HashMap<String, ByteBuffer>();
+ Map<String, ByteBuffer> serviceData = new HashMap<>();
// Tokens
// Setup up task credentials buffer
- ByteBuffer containerCredentialsBuffer = ByteBuffer.wrap(new byte[] {});
+ ByteBuffer containerCredentialsBuffer;
try {
Credentials containerCredentials = new Credentials();
@@ -135,10 +132,8 @@ public class AMContainerHelpers {
// Construct the actual Container
// The null fields are per-container and will be constructed for each
// container separately.
- ContainerLaunchContext container =
- ContainerLaunchContext.newInstance(null, environment, null,
- serviceData, containerCredentialsBuffer, applicationACLs);
- return container;
+ return ContainerLaunchContext.newInstance(null, environment, null,
+ serviceData, containerCredentialsBuffer, applicationACLs);
}
@VisibleForTesting
@@ -153,14 +148,14 @@ public class AMContainerHelpers {
AppContext appContext, Resource containerResource,
Configuration conf, String auxiliaryService) {
- ContainerLaunchContext commonContainerSpec = null;
- synchronized (commonContainerSpecLock) {
- if (!commonContainerSpecs.containsKey(tezDAGID)) {
+ ContainerLaunchContext commonContainerSpec;
+ synchronized (COMMON_CONTAINER_SPEC_LOCK) {
+ if (!COMMON_CONTAINER_SPECS.containsKey(tezDAGID)) {
commonContainerSpec =
createCommonContainerLaunchContext(acls, credentials, auxiliaryService);
- commonContainerSpecs.put(tezDAGID, commonContainerSpec);
+ COMMON_CONTAINER_SPECS.put(tezDAGID, commonContainerSpec);
} else {
- commonContainerSpec = commonContainerSpecs.get(tezDAGID);
+ commonContainerSpec = COMMON_CONTAINER_SPECS.get(tezDAGID);
}
// Ensure that we remove container specs for previous AMs to reduce
@@ -168,14 +163,14 @@ public class AMContainerHelpers {
if (lastDAGID == null) {
lastDAGID = tezDAGID;
} else if (!lastDAGID.equals(tezDAGID)) {
- commonContainerSpecs.remove(lastDAGID);
+ COMMON_CONTAINER_SPECS.remove(lastDAGID);
lastDAGID = tezDAGID;
}
}
// Setup environment by cloning from common env.
Map<String, String> env = commonContainerSpec.getEnvironment();
- Map<String, String> myEnv = new HashMap<String, String>(env.size());
+ Map<String, String> myEnv = new HashMap<>(env.size());
myEnv.putAll(env);
myEnv.putAll(vertexEnv);
@@ -197,17 +192,15 @@ public class AMContainerHelpers {
appContext.getApplicationAttemptId().getAttemptId(), modifiedJavaOpts);
// Duplicate the ByteBuffers for access by multiple containers.
- Map<String, ByteBuffer> myServiceData = new HashMap<String, ByteBuffer>();
+ Map<String, ByteBuffer> myServiceData = new HashMap<>();
for (Entry<String, ByteBuffer> entry : commonContainerSpec.getServiceData()
.entrySet()) {
myServiceData.put(entry.getKey(), entry.getValue().duplicate());
}
// Construct the actual Container
- ContainerLaunchContext container =
- ContainerLaunchContext.newInstance(localResources, myEnv, commands,
- myServiceData, commonContainerSpec.getTokens().duplicate(), acls);
- return container;
+ return ContainerLaunchContext.newInstance(localResources, myEnv, commands,
+ myServiceData, commonContainerSpec.getTokens().duplicate(), acls);
}
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 8f4cd1fa6..5cc940fc4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -51,14 +51,16 @@ import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
-public class HistoryEventJsonConversion {
+public final class HistoryEventJsonConversion {
+
+ private HistoryEventJsonConversion() {}
public static JSONObject convertToJson(HistoryEvent historyEvent) throws JSONException {
if (!historyEvent.isHistoryEvent()) {
throw new UnsupportedOperationException("Invalid Event, does not support history"
+ ", eventType=" + historyEvent.getEventType());
}
- JSONObject jsonObject = null;
+ JSONObject jsonObject;
switch (historyEvent.getEventType()) {
case APP_LAUNCHED:
jsonObject = convertAppLaunchedEvent((AppLaunchedEvent) historyEvent);
@@ -402,7 +404,7 @@ public class HistoryEventJsonConversion {
JSONObject otherInfo = new JSONObject();
if (event.getVertexNameIDMap() != null) {
- Map<String, String> nameIdStrMap = new TreeMap<String, String>();
+ Map<String, String> nameIdStrMap = new TreeMap<>();
for (Entry<String, TezVertexID> entry : event.getVertexNameIDMap().entrySet()) {
nameIdStrMap.put(entry.getKey(), entry.getValue().toString());
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index f59e9ace7..77e7179c2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -56,7 +56,7 @@ import org.codehaus.jettison.json.JSONObject;
import org.apache.tez.common.Preconditions;
-public class DAGUtils {
+public final class DAGUtils {
public static final String DAG_NAME_KEY = "dagName";
public static final String DAG_INFO_KEY = "dagInfo";
@@ -99,7 +99,7 @@ public class DAGUtils {
public static final String VERTEX_GROUP_EDGE_MERGED_INPUTS_KEY = "edgeMergedInputs";
public static final String VERTEX_GROUP_DESTINATION_VERTEX_NAME_KEY = "destinationVertexName";
-
+ private DAGUtils() {}
public static JSONObject generateSimpleJSONPlan(DAGPlan dagPlan) throws JSONException {
JSONObject dagJson;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java
index cc89b9fb4..16d59c42f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/TezEventUtils.java
@@ -32,7 +32,9 @@ import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TezEvent;
-public class TezEventUtils {
+public final class TezEventUtils {
+
+ private TezEventUtils() {}
public static TezEventProto toProto(TezEvent event) throws IOException {
TezEventProto.Builder evtBuilder =
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java
index 56e46a055..e179b4fd9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/ProtoUtils.java
@@ -24,7 +24,9 @@ import org.apache.tez.dag.recovery.records.RecoveryProtos;
import com.google.protobuf.ByteString;
-public class ProtoUtils {
+public final class ProtoUtils {
+
+ private ProtoUtils() {}
public static RecoveryProtos.SummaryEventProto toSummaryEventProto(
TezDAGID dagID, long timestamp, HistoryEventType historyEventType, byte[] payload) {
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezBuilderUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezBuilderUtils.java
index b7e6f724f..25551a1cc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezBuilderUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezBuilderUtils.java
@@ -27,7 +27,9 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-public class TezBuilderUtils {
+public final class TezBuilderUtils {
+
+ private TezBuilderUtils() {}
public static TezVertexID newVertexID(TezDAGID dagId, int vertexId) {
return TezVertexID.getInstance(dagId, vertexId);
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
index 9458193dc..b4871bd04 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/utils/TezRuntimeChildJVM.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.runtime.task.TezChild;
-public class TezRuntimeChildJVM {
+public final class TezRuntimeChildJVM {
+
+ private TezRuntimeChildJVM() {}
// FIXME
- public static enum LogName {
+ public enum LogName {
/** Log on the stdout of the task. */
STDOUT ("stdout"),
@@ -47,9 +49,9 @@ public class TezRuntimeChildJVM {
/** Log the debug script's stdout */
DEBUGOUT ("debugout");
- private String prefix;
+ private final String prefix;
- private LogName(String prefix) {
+ LogName(String prefix) {
this.prefix = prefix;
}
@@ -71,7 +73,7 @@ public class TezRuntimeChildJVM {
int applicationAttemptNumber,
String javaOpts) {
- Vector<String> vargs = new Vector<String>(9);
+ Vector<String> vargs = new Vector<>(9);
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
@@ -101,7 +103,7 @@ public class TezRuntimeChildJVM {
for (CharSequence str : vargs) {
mergedCommand.append(str).append(" ");
}
- Vector<String> vargsFinal = new Vector<String>(1);
+ Vector<String> vargsFinal = new Vector<>(1);
vargsFinal.add(mergedCommand.toString());
return vargsFinal;
}
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/PluginWrapperTestHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/PluginWrapperTestHelpers.java
index fb6faa1bb..5009cefff 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/PluginWrapperTestHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/PluginWrapperTestHelpers.java
@@ -14,9 +14,7 @@
package org.apache.tez.dag.app;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import java.lang.reflect.Constructor;
@@ -30,15 +28,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PluginWrapperTestHelpers {
+public final class PluginWrapperTestHelpers {
private static final Logger LOG = LoggerFactory.getLogger(PluginWrapperTestHelpers.class);
+ private PluginWrapperTestHelpers() {}
+
public static void testDelegation(Class<?> delegateClass, Class<?> rawClass,
Set<String> skipMethods) throws Exception {
TrackingAnswer answer = new TrackingAnswer();
Object mock = mock(rawClass, answer);
- Constructor ctor = delegateClass.getConstructor(rawClass);
+ Constructor<?> ctor = delegateClass.getConstructor(rawClass);
Object wrapper = ctor.newInstance(mock);
// Run through all the methods on the wrapper, and invoke the methods. Constructs
@@ -48,7 +48,7 @@ public class PluginWrapperTestHelpers {
if (method.getDeclaringClass().equals(delegateClass) &&
!skipMethods.contains(method.getName())) {
- assertTrue(method.getExceptionTypes().length == 1);
+ assertEquals(1, method.getExceptionTypes().length);
assertEquals(Exception.class, method.getExceptionTypes()[0]);
LOG.info("Checking method [{}] with parameterTypes [{}]", method.getName(), Arrays.toString(method.getParameterTypes()));
@@ -65,8 +65,8 @@ public class PluginWrapperTestHelpers {
if (answer.compareAsPrimitive) {
assertEquals(answer.lastRetValue, result);
} else {
- assertTrue("Expected: " + System.identityHashCode(answer.lastRetValue) + ", actual=" +
- System.identityHashCode(result), answer.lastRetValue == result);
+ assertSame("Expected: " + System.identityHashCode(answer.lastRetValue) + ", actual=" +
+ System.identityHashCode(result), answer.lastRetValue, result);
}
}
}
@@ -74,8 +74,7 @@ public class PluginWrapperTestHelpers {
}
- public static Object[] constructMethodArgs(Method method) throws IllegalAccessException,
- InstantiationException {
+ public static Object[] constructMethodArgs(Method method) {
Class<?>[] paramTypes = method.getParameterTypes();
Object[] params = new Object[paramTypes.length];
for (int i = 0; i < paramTypes.length; i++) {
@@ -112,7 +111,7 @@ public class PluginWrapperTestHelpers {
} else if (clazz.equals(int.class)) {
return 224;
} else if (clazz.equals(long.class)) {
- return 445l;
+ return 445L;
} else if (clazz.equals(float.class)) {
return 2.24f;
} else if (clazz.equals(double.class)) {
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 490067a54..08941e762 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -18,8 +18,7 @@
package org.apache.tez.dag.app.rm;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -76,7 +75,9 @@ import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
-class TestTaskSchedulerHelpers {
+final class TestTaskSchedulerHelpers {
+
+ private TestTaskSchedulerHelpers() {}
// Mocking AMRMClientImpl to make use of getMatchingRequest
static class AMRMClientForTest extends AMRMClientImpl<CookieContainerRequest> {
@@ -143,9 +144,8 @@ class TestTaskSchedulerHelpers {
static class TaskSchedulerManagerForTest extends
TaskSchedulerManager {
- private TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync;
- private ContainerSignatureMatcher containerSignatureMatcher;
- private UserPayload defaultPayload;
+ private final TezAMRMClientAsync<CookieContainerRequest> amrmClientAsync;
+ private final UserPayload defaultPayload;
@SuppressWarnings("rawtypes")
public TaskSchedulerManagerForTest(AppContext appContext,
@@ -157,7 +157,6 @@ class TestTaskSchedulerHelpers {
Lists.newArrayList(new NamedEntityDescriptor("FakeScheduler", null)),
false, new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim());
this.amrmClientAsync = amrmClientAsync;
- this.containerSignatureMatcher = containerSignatureMatcher;
this.defaultPayload = defaultPayload;
}
@@ -170,7 +169,6 @@ class TestTaskSchedulerHelpers {
super(appContext, null, eventHandler, containerSignatureMatcher, null, descriptors,
false, new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim());
this.amrmClientAsync = amrmClientAsync;
- this.containerSignatureMatcher = containerSignatureMatcher;
this.defaultPayload = defaultPayload;
}
@@ -212,7 +210,7 @@ class TestTaskSchedulerHelpers {
@SuppressWarnings("rawtypes")
static class CapturingEventHandler implements EventHandler {
- private Queue<Event> events = new ConcurrentLinkedQueue<Event>();
+ private final Queue<Event> events = new ConcurrentLinkedQueue<Event>();
public void handle(Event event) {
events.add(event);
@@ -224,7 +222,7 @@ class TestTaskSchedulerHelpers {
public void verifyNoInvocations(Class<? extends Event> eventClass) {
for (Event e : events) {
- assertFalse(e.getClass().getName().equals(eventClass.getName()));
+ assertNotEquals(e.getClass().getName(), eventClass.getName());
}
}
@@ -262,8 +260,8 @@ class TestTaskSchedulerHelpers {
static class TaskSchedulerContextDrainable implements TaskSchedulerContext {
int completedEvents;
int invocations;
- private TaskSchedulerContext real;
- private CountingExecutorService countingExecutorService;
+ private final TaskSchedulerContext real;
+ private final CountingExecutorService countingExecutorService;
final AtomicInteger count = new AtomicInteger(0);
public TaskSchedulerContextDrainable(TaskSchedulerContextImplWrapper real) {
@@ -441,10 +439,7 @@ class TestTaskSchedulerHelpers {
@Override
public boolean isExactMatch(Object cs1, Object cs2) {
- if (cs1 == cs2 && cs1 != null) {
- return true;
- }
- return false;
+ return cs1 == cs2 && cs1 != null;
}
@Override
diff --git a/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java b/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java
index 074f77fa1..c143e21d7 100644
--- a/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java
+++ b/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java
@@ -18,29 +18,17 @@
package org.apache.tez.examples;
-import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.EnumSet;
-import java.util.Set;
-
import org.apache.hadoop.util.ProgramDriver;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.Progress;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.dag.api.client.VertexStatus;
/**
* A description of an example program based on its class and a
* human-readable description.
*/
-public class ExampleDriver {
+public final class ExampleDriver {
- private static final DecimalFormat formatter = new DecimalFormat("###.##%");
+ private ExampleDriver() {}
- public static void main(String argv[]){
+ public static void main(String[] argv){
int exitCode = -1;
ProgramDriver pgd = new ProgramDriver();
try {
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java
index bf4a5bdae..e088ef791 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/service/TezTestServiceConfConstants.java
@@ -14,7 +14,7 @@
package org.apache.tez.service;
-public class TezTestServiceConfConstants {
+public final class TezTestServiceConfConstants {
private static final String TEZ_TEST_SERVICE_PREFIX = "tez.test.service.";
@@ -38,4 +38,5 @@ public class TezTestServiceConfConstants {
public static final String TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS = TEZ_TEST_SERVICE_PREFIX + "communicator.num.threads";
public static final int TEZ_TEST_SERVICE_AM_COMMUNICATOR_NUM_THREADS_DEFAULT = 2;
+ private TezTestServiceConfConstants() {}
}
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
index 25d61d0bf..a595210df 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/util/ProtoConverters.java
@@ -37,7 +37,9 @@ import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.IOSpecProto;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos.TaskSpecProto.Builder;
-public class ProtoConverters {
+public final class ProtoConverters {
+
+ private ProtoConverters() {}
public static TaskSpec getTaskSpecfromProto(TaskSpecProto taskSpecProto) {
TezTaskAttemptID taskAttemptID =
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
index db156d2ce..394c871ab 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReaderTez.java
@@ -41,13 +41,15 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class SplitMetaInfoReaderTez {
+public final class SplitMetaInfoReaderTez {
public static final Logger LOG = LoggerFactory.getLogger(SplitMetaInfoReaderTez.class);
public static final int META_SPLIT_VERSION = JobSplit.META_SPLIT_VERSION;
public static final byte[] META_SPLIT_FILE_HEADER = JobSplit.META_SPLIT_FILE_HEADER;
+ private SplitMetaInfoReaderTez() {}
+
private static FSDataInputStream getFSDataIS(Configuration conf,
FileSystem fs) throws IOException {
long maxMetaInfoSize = conf.getLong(
@@ -69,7 +71,7 @@ public class SplitMetaInfoReaderTez {
+ FileSystem.getDefaultUri(conf));
}
- FileStatus fStatus = null;
+ FileStatus fStatus;
try {
fStatus = fs.getFileStatus(metaSplitFile);
if (maxMetaInfoSize > 0 && fStatus.getLen() > maxMetaInfoSize) {
@@ -131,7 +133,6 @@ public class SplitMetaInfoReaderTez {
* @param fs FileSystem.
* @param index the index of the task.
* @return split meta info object of the task.
- * @throws IOException
*/
public static TaskSplitMetaInfo getSplitMetaInfo(Configuration conf,
FileSystem fs, int index) throws IOException {
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/common/MRFrameworkConfigs.java b/tez-mapreduce/src/main/java/org/apache/tez/common/MRFrameworkConfigs.java
index 6831b52ec..4cbb28530 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/common/MRFrameworkConfigs.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/common/MRFrameworkConfigs.java
@@ -23,7 +23,7 @@ package org.apache.tez.common;
import org.apache.hadoop.classification.InterfaceAudience;
@InterfaceAudience.Private
-public class MRFrameworkConfigs {
+public final class MRFrameworkConfigs {
/**
* One local dir for the specific job.
@@ -38,4 +38,6 @@ public class MRFrameworkConfigs {
public static final String TASK_LOCAL_RESOURCE_DIR_DEFAULT = "/tmp";
public static final String JOB_LOCAL_DIR = MR_FRAMEWORK_PREFIX + "job.local.dir";
+
+ private MRFrameworkConfigs() {}
}
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java
index 050ba7912..670ee5db4 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/Utils.java
@@ -33,7 +33,9 @@ import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
@Private
-public class Utils {
+public final class Utils {
+
+ private Utils() {}
/**
* Gets a handle to the Statistics instance based on the scheme associated
@@ -46,7 +48,7 @@ public class Utils {
*/
@Private
public static List<Statistics> getFsStatistics(Path path, Configuration conf) throws IOException {
- List<Statistics> matchedStats = new ArrayList<FileSystem.Statistics>();
+ List<Statistics> matchedStats = new ArrayList<>();
path = path.getFileSystem(conf).makeQualified(path);
String scheme = path.toUri().getScheme();
for (Statistics stats : FileSystem.getAllStatistics()) {
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
index 098057ba1..871206054 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java
@@ -27,7 +27,7 @@ import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.Constants;
-public class DeprecatedKeys {
+public final class DeprecatedKeys {
@@ -35,13 +35,13 @@ public class DeprecatedKeys {
/**
* Keys used by the DAG - mainly the AM.
*/
- private static Map<String, String> mrParamToDAGParamMap = new HashMap<String, String>();
+ private static final Map<String, String> MR_PARAM_TO_DAG_PARAM_MAP = new HashMap<>();
/**
* Keys used by the Tez Runtime.
*/
- private static Map<String, String> mrParamToTezRuntimeParamMap =
- new HashMap<String, String>();
+ private static final Map<String, String> MR_PARAM_TO_TEZ_RUNTIME_PARAM_MAP =
+ new HashMap<>();
@@ -51,20 +51,22 @@ public class DeprecatedKeys {
addDeprecatedKeys();
}
+ private DeprecatedKeys() {}
+
private static void populateMRToDagParamMap() {
// TODO Default value handling.
- mrParamToDAGParamMap.put(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
+ MR_PARAM_TO_DAG_PARAM_MAP.put(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT);
- mrParamToDAGParamMap.put(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER,
+ MR_PARAM_TO_DAG_PARAM_MAP.put(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER,
TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE);
- mrParamToDAGParamMap.put(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE,
+ MR_PARAM_TO_DAG_PARAM_MAP.put(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE,
TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED);
- mrParamToDAGParamMap.put(
+ MR_PARAM_TO_DAG_PARAM_MAP.put(
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
TezConfiguration.TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD);
- mrParamToDAGParamMap.put(MRJobConfig.QUEUE_NAME,
+ MR_PARAM_TO_DAG_PARAM_MAP.put(MRJobConfig.QUEUE_NAME,
TezConfiguration.TEZ_QUEUE_NAME);
// Counter replacement will work in this manner, as long as TezCounters
@@ -73,23 +75,23 @@ public class DeprecatedKeys {
// may break.
// Framework counters, like FILESYSTEM will likely be incompatible since
// they enum key belongs to a different package.
- mrParamToDAGParamMap.put(MRJobConfig.COUNTERS_MAX_KEY,
+ MR_PARAM_TO_DAG_PARAM_MAP.put(MRJobConfig.COUNTERS_MAX_KEY,
TezConfiguration.TEZ_COUNTERS_MAX);
- mrParamToDAGParamMap.put(MRJobConfig.COUNTER_GROUPS_MAX_KEY,
+ MR_PARAM_TO_DAG_PARAM_MAP.put(MRJobConfig.COUNTER_GROUPS_MAX_KEY,
TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
- mrParamToDAGParamMap.put(MRJobConfig.COUNTER_NAME_MAX_KEY,
+ MR_PARAM_TO_DAG_PARAM_MAP.put(MRJobConfig.COUNTER_NAME_MAX_KEY,
TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
- mrParamToDAGParamMap.put(MRJobConfig.COUNTER_GROUP_NAME_MAX_KEY,
+ MR_PARAM_TO_DAG_PARAM_MAP.put(MRJobConfig.COUNTER_GROUP_NAME_MAX_KEY,
TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
- mrParamToDAGParamMap.put(MRJobConfig.TASK_TIMEOUT,
+ MR_PARAM_TO_DAG_PARAM_MAP.put(MRJobConfig.TASK_TIMEOUT,
TezConfiguration.TASK_HEARTBEAT_TIMEOUT_MS);
- mrParamToDAGParamMap.put(MRJobConfig.JOB_TAGS,
+ MR_PARAM_TO_DAG_PARAM_MAP.put(MRJobConfig.JOB_TAGS,
TezConfiguration.TEZ_APPLICATION_TAGS);
- mrParamToDAGParamMap.put(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST,
+ MR_PARAM_TO_DAG_PARAM_MAP.put(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST,
TezConfiguration.TEZ_USER_CLASSPATH_FIRST);
- mrParamToDAGParamMap.put(MRJobConfig.JOB_NAMENODES,
+ MR_PARAM_TO_DAG_PARAM_MAP.put(MRJobConfig.JOB_NAMENODES,
TezConfiguration.TEZ_JOB_FS_SERVERS);
- mrParamToDAGParamMap.put(MRJobConfig.JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE,
+ MR_PARAM_TO_DAG_PARAM_MAP.put(MRJobConfig.JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE,
TezConfiguration.TEZ_JOB_FS_SERVERS_TOKEN_RENEWAL_EXCLUDE);
}
@@ -181,14 +183,14 @@ public class DeprecatedKeys {
private static void registerMRToRuntimeKeyTranslation(String mrKey,
String tezKey) {
- mrParamToTezRuntimeParamMap.put(mrKey, tezKey);
+ MR_PARAM_TO_TEZ_RUNTIME_PARAM_MAP.put(mrKey, tezKey);
}
public static Map<String, String> getMRToDAGParamMap() {
- return Collections.unmodifiableMap(mrParamToDAGParamMap);
+ return Collections.unmodifiableMap(MR_PARAM_TO_DAG_PARAM_MAP);
}
public static Map<String, String> getMRToTezRuntimeParamMap() {
- return Collections.unmodifiableMap(mrParamToTezRuntimeParamMap);
+ return Collections.unmodifiableMap(MR_PARAM_TO_TEZ_RUNTIME_PARAM_MAP);
}
}
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java
index 2fddd0052..8656147e2 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/IDConverter.java
@@ -28,7 +28,9 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
-public class IDConverter {
+public final class IDConverter {
+
+ private IDConverter() {}
// FIXME hardcoded assumption that one app is one dag
public static JobID toMRJobId(TezDAGID dagId) {
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
index 1d06b7aea..dd13eb205 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -45,10 +45,11 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
*/
@Public
@Evolving
-public class MRHelpers {
+public final class MRHelpers {
private static final Logger LOG = LoggerFactory.getLogger(MRHelpers.class);
+ private MRHelpers() {}
/**
* Translate MapReduce configuration keys to the equivalent Tez keys in the provided
@@ -105,7 +106,7 @@ public class MRHelpers {
}
private static void convertVertexConfToTez(Configuration vertexConf, boolean preferTez) {
- setStageKeysFromBaseConf(vertexConf, vertexConf, "unknown");
+ setStageKeysFromBaseConf(vertexConf, vertexConf);
processDirectConversion(vertexConf, preferTez);
setupMRComponents(vertexConf);
}
@@ -136,7 +137,7 @@ public class MRHelpers {
* require translation to tez keys.
*/
private static void setStageKeysFromBaseConf(Configuration conf,
- Configuration baseConf, String stage) {
+ Configuration baseConf) {
// Don't clobber explicit tez config.
JobConf jobConf = null;
if (conf.get(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS) == null) {
@@ -151,7 +152,7 @@ public class MRHelpers {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting " + MRJobConfig.MAP_OUTPUT_KEY_CLASS
- + " for stage: " + stage
+ + " for stage: unknown"
+ " based on job level configuration. Value: "
+ conf.get(MRJobConfig.MAP_OUTPUT_KEY_CLASS));
}
@@ -168,7 +169,7 @@ public class MRHelpers {
.getMapOutputValueClass().getName());
if (LOG.isDebugEnabled()) {
LOG.debug("Setting " + MRJobConfig.MAP_OUTPUT_VALUE_CLASS
- + " for stage: " + stage
+ + " for stage: unknown"
+ " based on job level configuration. Value: "
+ conf.get(MRJobConfig.MAP_OUTPUT_VALUE_CLASS));
}
@@ -223,7 +224,7 @@ public class MRHelpers {
private static String getLog4jCmdLineProperties(Configuration conf,
boolean isMap) {
- Vector<String> logProps = new Vector<String>(4);
+ Vector<String> logProps = new Vector<>(4);
TezUtils.addLog4jSystemProperties(getChildLogLevel(conf, isMap), logProps);
StringBuilder sb = new StringBuilder();
for (String str : logProps) {
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
index a8e85a34e..26bba4d00 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java
@@ -82,6 +82,8 @@ public class MRInputHelpers {
static final String JOB_SPLIT_RESOURCE_NAME = "job.split";
static final String JOB_SPLIT_METAINFO_RESOURCE_NAME = "job.splitmetainfo";
+ protected MRInputHelpers() {}
+
/**
* Setup split generation on the client, with splits being distributed via the traditional
* MapReduce mechanism of distributing splits via the Distributed Cache.
@@ -107,7 +109,7 @@ public class MRInputHelpers {
public static DataSourceDescriptor configureMRInputWithLegacySplitGeneration(Configuration conf,
Path splitsDir,
boolean useLegacyInput) {
- InputSplitInfo inputSplitInfo = null;
+ InputSplitInfo inputSplitInfo;
try {
inputSplitInfo = generateInputSplits(conf, splitsDir);
@@ -117,17 +119,11 @@ public class MRInputHelpers {
Map<String, LocalResource> additionalLocalResources = new HashMap<String, LocalResource>();
updateLocalResourcesForInputSplits(conf, inputSplitInfo,
additionalLocalResources);
- DataSourceDescriptor dsd =
- DataSourceDescriptor.create(inputDescriptor, null, inputSplitInfo.getNumTasks(),
- inputSplitInfo.getCredentials(),
- VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()),
- additionalLocalResources);
- return dsd;
- } catch (IOException e) {
- throw new TezUncheckedException("Failed to generate InputSplits", e);
- } catch (InterruptedException e) {
- throw new TezUncheckedException("Failed to generate InputSplits", e);
- } catch (ClassNotFoundException e) {
+ return DataSourceDescriptor.create(inputDescriptor, null, inputSplitInfo.getNumTasks(),
+ inputSplitInfo.getCredentials(),
+ VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()),
+ additionalLocalResources);
+ } catch (IOException | InterruptedException | ClassNotFoundException e) {
throw new TezUncheckedException("Failed to generate InputSplits", e);
}
}
@@ -139,7 +135,6 @@ public class MRInputHelpers {
* @param payload the {@link org.apache.tez.dag.api.UserPayload} instance
* @return an instance of {@link org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto},
* which provides access to the underlying configuration bytes
- * @throws IOException
*/
@InterfaceStability.Evolving
@InterfaceAudience.LimitedPrivate({"hive, pig"})
@@ -156,7 +151,6 @@ public class MRInputHelpers {
* instance representing the split
* @param serializationFactory the serialization mechanism used to write out the split
* @return an instance of the split
- * @throws java.io.IOException
*/
@SuppressWarnings("unchecked")
@InterfaceStability.Evolving
@@ -192,7 +186,6 @@ public class MRInputHelpers {
* instance representing the split
* @param serializationFactory the serialization mechanism used to write out the split
* @return an instance of the split
- * @throws IOException
*/
@InterfaceStability.Evolving
@SuppressWarnings("unchecked")
@@ -222,7 +215,7 @@ public class MRInputHelpers {
@InterfaceStability.Evolving
public static <T extends org.apache.hadoop.mapreduce.InputSplit> MRRuntimeProtos.MRSplitProto createSplitProto(
T newSplit, SerializationFactory serializationFactory)
- throws IOException, InterruptedException {
+ throws IOException {
MRRuntimeProtos.MRSplitProto.Builder builder = MRRuntimeProtos.MRSplitProto
.newBuilder();
@@ -278,9 +271,6 @@ public class MRInputHelpers {
* @param targetTasks the number of target tasks if grouping is enabled. Specify as 0 otherwise.
* @return an instance of {@link InputSplitInfoMem} which supports a subset of
* the APIs defined on {@link InputSplitInfo}
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
*/
@InterfaceStability.Unstable
@InterfaceAudience.LimitedPrivate({"hive, pig"})
@@ -310,16 +300,13 @@ public class MRInputHelpers {
* @param targetTasks the number of target tasks if grouping is enabled. Specify as 0 otherwise.
* @return an instance of {@link InputSplitInfoMem} which supports a subset of
* the APIs defined on {@link InputSplitInfo}
- * @throws IOException
- * @throws ClassNotFoundException
- * @throws InterruptedException
*/
@InterfaceStability.Unstable
public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf,
boolean groupSplits, boolean sortSplits, int targetTasks)
throws IOException, ClassNotFoundException, InterruptedException {
- InputSplitInfoMem splitInfoMem = null;
+ InputSplitInfoMem splitInfoMem;
JobConf jobConf = new JobConf(conf);
if (jobConf.getUseNewMapper()) {
LOG.debug("Generating mapreduce api input splits");
@@ -356,7 +343,7 @@ public class MRInputHelpers {
if (rack == null) {
if (input.getLocations() != null) {
return TaskLocationHint.createTaskLocationHint(
- new HashSet<String>(Arrays.asList(input.getLocations())), null);
+ new HashSet<>(Arrays.asList(input.getLocations())), null);
} else {
return TaskLocationHint.createTaskLocationHint(null, null);
}
@@ -366,7 +353,7 @@ public class MRInputHelpers {
}
} else {
return TaskLocationHint.createTaskLocationHint(
- new HashSet<String>(Arrays.asList(input.getLocations())), null);
+ new HashSet<>(Arrays.asList(input.getLocations())), null);
}
} catch (IOException e) {
throw new RuntimeException(e);
@@ -399,7 +386,7 @@ public class MRInputHelpers {
}
} else {
return TaskLocationHint.createTaskLocationHint(
- new HashSet<String>(Arrays.asList(input.getLocations())),
+ new HashSet<>(Arrays.asList(input.getLocations())),
null);
}
} catch (IOException e) {
@@ -413,20 +400,20 @@ public class MRInputHelpers {
@SuppressWarnings({ "rawtypes", "unchecked" })
private static org.apache.hadoop.mapreduce.InputSplit[] generateNewSplits(
JobContext jobContext, boolean groupSplits, boolean sortSplits,
- int numTasks) throws ClassNotFoundException, IOException,
+ int numTasks) throws IOException,
InterruptedException {
Configuration conf = jobContext.getConfiguration();
// This is the real input format.
- org.apache.hadoop.mapreduce.InputFormat<?, ?> inputFormat = null;
+ org.apache.hadoop.mapreduce.InputFormat<?, ?> inputFormat;
try {
inputFormat = ReflectionUtils.newInstance(jobContext.getInputFormatClass(), conf);
} catch (ClassNotFoundException e) {
throw new TezUncheckedException(e);
}
- org.apache.hadoop.mapreduce.InputFormat<?, ?> finalInputFormat = inputFormat;
+ org.apache.hadoop.mapreduce.InputFormat<?, ?> finalInputFormat;
// For grouping, the underlying InputFormatClass class is passed in as a parameter.
// JobContext has this setup as TezGroupedSplitInputFormat
@@ -443,7 +430,7 @@ public class MRInputHelpers {
List<org.apache.hadoop.mapreduce.InputSplit> array = finalInputFormat
.getSplits(jobContext);
- org.apache.hadoop.mapreduce.InputSplit[] splits = (org.apache.hadoop.mapreduce.InputSplit[]) array
+ org.apache.hadoop.mapreduce.InputSplit[] splits = array
.toArray(new org.apache.hadoop.mapreduce.InputSplit[array.size()]);
if (sortSplits) {
@@ -469,7 +456,7 @@ public class MRInputHelpers {
throw new TezUncheckedException(e);
}
- org.apache.hadoop.mapred.InputFormat finalInputFormat = inputFormat;
+ org.apache.hadoop.mapred.InputFormat finalInputFormat;
if (groupSplits) {
org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat groupedFormat =
@@ -502,16 +489,8 @@ public class MRInputHelpers {
try {
long len1 = o1.getLength();
long len2 = o2.getLength();
- if (len1 < len2) {
- return 1;
- } else if (len1 == len2) {
- return 0;
- } else {
- return -1;
- }
- } catch (IOException ie) {
- throw new RuntimeException("exception in InputSplit compare", ie);
- } catch (InterruptedException ie) {
+ return Long.compare(len2, len1);
+ } catch (IOException | InterruptedException ie) {
throw new RuntimeException("exception in InputSplit compare", ie);
}
}
@@ -528,13 +507,7 @@ public class MRInputHelpers {
try {
long len1 = o1.getLength();
long len2 = o2.getLength();
- if (len1 < len2) {
- return 1;
- } else if (len1 == len2) {
- return 0;
- } else {
- return -1;
- }
+ return Long.compare(len2, len1);
} catch (IOException ie) {
throw new RuntimeException("Problem getting input split size", ie);
}
@@ -549,10 +522,6 @@ public class MRInputHelpers {
* @return InputSplitInfo containing the split files' information and the
* location hints for each split generated to be used to determining parallelism of
* the map stage.
- *
- * @throws IOException
- * @throws InterruptedException
- * @throws ClassNotFoundException
*/
private static InputSplitInfoDisk writeNewSplits(JobContext jobContext,
Path inputSplitDir) throws IOException, InterruptedException,
@@ -568,10 +537,10 @@ public class MRInputHelpers {
List<TaskLocationHint> locationHints =
new ArrayList<TaskLocationHint>(splits.length);
- for (int i = 0; i < splits.length; ++i) {
+ for (org.apache.hadoop.mapreduce.InputSplit split : splits) {
locationHints.add(
- TaskLocationHint.createTaskLocationHint(new HashSet<String>(
- Arrays.asList(splits[i].getLocations())), null)
+ TaskLocationHint.createTaskLocationHint(new HashSet<String>(
+ Arrays.asList(split.getLocations())), null)
);
}
@@ -589,8 +558,6 @@ public class MRInputHelpers {
* @return InputSplitInfo containing the split files' information and the
* number of splits generated to be used to determining parallelism of
* the map stage.
- *
- * @throws IOException
*/
private static InputSplitInfoDisk writeOldSplits(JobConf jobConf,
Path inputSplitDir) throws IOException {
@@ -602,11 +569,11 @@ public class MRInputHelpers {
inputSplitDir.getFileSystem(jobConf), splits);
List<TaskLocationHint> locationHints =
- new ArrayList<TaskLocationHint>(splits.length);
- for (int i = 0; i < splits.length; ++i) {
+ new ArrayList<>(splits.length);
+ for (InputSplit split : splits) {
locationHints.add(
- TaskLocationHint.createTaskLocationHint(new HashSet<String>(
- Arrays.asList(splits[i].getLocations())), null)
+ TaskLocationHint.createTaskLocationHint(new HashSet<>(
+ Arrays.asList(split.getLocations())), null)
);
}
@@ -637,10 +604,6 @@ public class MRInputHelpers {
* @return InputSplitInfo containing the split files' information and the
* number of splits generated to be used to determining parallelism of
* the map stage.
- *
- * @throws IOException
- * @throws InterruptedException
- * @throws ClassNotFoundException
*/
private static InputSplitInfoDisk generateInputSplits(Configuration conf,
Path inputSplitsDir) throws IOException, InterruptedException,
@@ -666,7 +629,6 @@ public class MRInputHelpers {
* @param conf Configuration
* @param inputSplitInfo Information on location of split files
* @param localResources LocalResources collection to be updated
- * @throws IOException
*/
private static void updateLocalResourcesForInputSplits(
Configuration conf,
@@ -751,8 +713,8 @@ public class MRInputHelpers {
}
private static UserPayload createMRInputPayload(ByteString bytes,
- MRRuntimeProtos.MRSplitsProto mrSplitsProto,
- boolean isGrouped, boolean isSorted) throws IOException {
+ MRRuntimeProtos.MRSplitsProto mrSplitsProto,
+ boolean isGrouped, boolean isSorted) {
MRRuntimeProtos.MRInputUserPayloadProto.Builder userPayloadBuilder =
MRRuntimeProtos.MRInputUserPayloadProto
.newBuilder();
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
index 3f5ad230c..de38766cd 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfToTezTranslator.java
@@ -25,13 +25,14 @@ import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-public class MultiStageMRConfToTezTranslator {
+public final class MultiStageMRConfToTezTranslator {
+
+ private MultiStageMRConfToTezTranslator() {}
/**
* Given a single base MRR config, returns a list of complete stage
* configurations.
- *
- * @param conf
+ *
* @return list of complete stage configurations given Conifiguration
*/
@Private
@@ -62,14 +63,13 @@ public class MultiStageMRConfToTezTranslator {
}
}
- Configuration confs[] = new Configuration[numStages];
+ Configuration[] confs = new Configuration[numStages];
Configuration nonItermediateConf = MultiStageMRConfigUtil.extractStageConf(
conf, "");
+ confs[0] = nonItermediateConf;
if (numStages == 1) {
- confs[0] = nonItermediateConf;
confs[0].setBoolean(MRConfig.IS_MAP_PROCESSOR, true);
} else {
- confs[0] = nonItermediateConf;
confs[numStages - 1] = new Configuration(nonItermediateConf);
confs[numStages -1].setBoolean(MRConfig.IS_MAP_PROCESSOR, false);
}
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
index 13e0b860e..23ffd3ef7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java
@@ -18,14 +18,15 @@
package org.apache.tez.mapreduce.hadoop;
-import java.util.Iterator;
import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
@Private
-public class MultiStageMRConfigUtil {
+public final class MultiStageMRConfigUtil {
+
+ private MultiStageMRConfigUtil() {}
//////////////////////////////////////////////////////////////////////////////
// Methods based on Stage Num //
@@ -73,9 +74,7 @@ public class MultiStageMRConfigUtil {
String prefix) {
Configuration strippedConf = new Configuration(false);
Configuration conf = new Configuration(false);
- Iterator<Entry<String, String>> confEntries = baseConf.iterator();
- while (confEntries.hasNext()) {
- Entry<String, String> entry = confEntries.next();
+ for (Entry<String, String> entry : baseConf) {
String key = entry.getKey();
if (key.startsWith(prefix)) {
// Ignore keys for other intermediate stages in case of an initial or final stage.
@@ -95,9 +94,7 @@ public class MultiStageMRConfigUtil {
}
// Replace values from strippedConf into the finalConf. Override values
// which may have been copied over from the baseConf root level.
- Iterator<Entry<String, String>> entries = strippedConf.iterator();
- while (entries.hasNext()) {
- Entry<String, String> entry = entries.next();
+ for (Entry<String, String> entry : strippedConf) {
if (!Configuration.isDeprecated(entry.getKey())) {
conf.set(entry.getKey(), entry.getValue());
}
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
index 6f9c1c760..0efde4ac7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/TezTypeConverters.java
@@ -29,7 +29,9 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
-public class TezTypeConverters {
+public final class TezTypeConverters {
+
+ private TezTypeConverters() {}
// TODO Remove unused methods
@@ -46,15 +48,13 @@ public class TezTypeConverters {
public static TaskAttemptId toYarn(TezTaskAttemptID taskAttemptId) {
TaskAttemptID mrTaskAttemptId = IDConverter
.toMRTaskAttemptId(taskAttemptId);
- TaskAttemptId mrv2TaskAttemptId = TypeConverter.toYarn(mrTaskAttemptId);
- return mrv2TaskAttemptId;
+ return TypeConverter.toYarn(mrTaskAttemptId);
}
public static TezTaskAttemptID toTez(TaskAttemptId taskAttemptId) {
TaskAttemptID mrTaskAttemptId = TypeConverter.fromYarn(taskAttemptId);
- TezTaskAttemptID tezTaskAttemptId = IDConverter
+ return IDConverter
.fromMRTaskAttemptId(mrTaskAttemptId);
- return tezTaskAttemptId;
}
public static Counters fromTez(TezCounters tezCounters) {
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index e64d273b4..ee907f5d7 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -176,7 +176,7 @@ public class MRInput extends MRInputBase {
}
private void initializeInputPath() {
- Preconditions.checkState(inputFormatProvided == false,
+ Preconditions.checkState(!inputFormatProvided,
"Should only be invoked when no inputFormat is provided");
if (org.apache.hadoop.mapred.FileInputFormat.class.isAssignableFrom(inputFormat) ||
FileInputFormat.class.isAssignableFrom(inputFormat)) {
@@ -437,8 +437,6 @@ public class MRInput extends MRInputBase {
private final ReentrantLock rrLock = new ReentrantLock();
private final Condition rrInited = rrLock.newCondition();
-
- private volatile boolean eventReceived = false;
private boolean readerCreated = false;
@@ -537,24 +535,24 @@ public class MRInput extends MRInputBase {
@Override
public KeyValueReader getReader() throws IOException {
Preconditions
- .checkState(readerCreated == false,
+ .checkState(!readerCreated,
"Only a single instance of record reader can be created for this input.");
readerCreated = true;
if (getNumPhysicalInputs() == 0) {
return new KeyValueReader() {
@Override
- public boolean next() throws IOException {
+ public boolean next() {
getContext().notifyProgress();
return false;
}
@Override
- public Object getCurrentKey() throws IOException {
+ public Object getCurrentKey() {
return null;
}
@Override
- public Object getCurrentValue() throws IOException {
+ public Object getCurrentValue() {
return null;
}
};
@@ -576,11 +574,11 @@ public class MRInput extends MRInputBase {
throw new IllegalStateException(
"Unexpected event. MRInput has been setup to receive 0 events");
}
- if (eventReceived || inputEvents.size() != 1) {
+
+ if (inputEvents.size() != 1) {
throw new IllegalStateException(
"MRInput expects only a single input. Received: current eventListSize: "
- + inputEvents.size() + "Received previous input: "
- + eventReceived);
+ + inputEvents.size() + "Received previous input: false");
}
Event event = inputEvents.iterator().next();
Preconditions.checkArgument(event instanceof InputDataInformationEvent,
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java
index a2b87e04c..e3c4f0e3a 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRInputUtils.java
@@ -43,14 +43,15 @@ import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
* Helper methods for InputFormat based Inputs. Private to Tez.
*/
@Private
-public class MRInputUtils {
+public final class MRInputUtils {
private static final Logger LOG = LoggerFactory.getLogger(MRInputUtils.class);
+ private MRInputUtils() {}
+
public static TaskSplitMetaInfo getSplits(Configuration conf, int index) throws IOException {
- TaskSplitMetaInfo taskSplitMInfo = SplitMetaInfoReaderTez
+ return SplitMetaInfoReaderTez
.getSplitMetaInfo(conf, FileSystem.getLocal(conf), index);
- return taskSplitMInfo;
}
public static org.apache.hadoop.mapreduce.InputSplit getNewSplitDetailsFromEvent(
@@ -78,12 +79,10 @@ public class MRInputUtils {
try {
cls = (Class<org.apache.hadoop.mapreduce.InputSplit>) jobConf.getClassByName(className);
} catch (ClassNotFoundException ce) {
- IOException wrap = new IOException("Split class " + className + " not found");
- wrap.initCause(ce);
- throw wrap;
+ throw new IOException("Split class " + className + " not found", ce);
}
SerializationFactory factory = new SerializationFactory(jobConf);
- Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = (Deserializer<org.apache.hadoop.mapreduce.InputSplit>) factory
+ Deserializer<org.apache.hadoop.mapreduce.InputSplit> deserializer = factory
.getDeserializer(cls);
deserializer.open(inFile);
org.apache.hadoop.mapreduce.InputSplit split = deserializer.deserialize(null);
@@ -111,12 +110,10 @@ public class MRInputUtils {
try {
cls = (Class<org.apache.hadoop.mapred.InputSplit>) jobConf.getClassByName(className);
} catch (ClassNotFoundException ce) {
- IOException wrap = new IOException("Split class " + className + " not found");
- wrap.initCause(ce);
- throw wrap;
+ throw new IOException("Split class " + className + " not found", ce);
}
SerializationFactory factory = new SerializationFactory(jobConf);
- Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer = (Deserializer<org.apache.hadoop.mapred.InputSplit>) factory
+ Deserializer<org.apache.hadoop.mapred.InputSplit> deserializer = factory
.getDeserializer(cls);
deserializer.open(inFile);
org.apache.hadoop.mapred.InputSplit split = deserializer.deserialize(null);
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
index 83c28dd7b..5c90f8a9d 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TezTestUtils.java
@@ -29,10 +29,11 @@ import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.InputInitializerContext;
-import java.io.IOException;
import java.util.Set;
-public class TezTestUtils {
+public final class TezTestUtils {
+
+ private TezTestUtils() {}
public static TezTaskAttemptID getMockTaskAttemptId(
int jobId, int vertexId, int taskId, int taskAttemptId) {
@@ -62,7 +63,7 @@ public class TezTestUtils {
private final UserPayload payload;
private final Configuration vertexConfig;
- public TezRootInputInitializerContextForTest(UserPayload payload, Configuration vertexConfig) throws IOException {
+ public TezRootInputInitializerContextForTest(UserPayload payload, Configuration vertexConfig) {
appId = ApplicationId.newInstance(1000, 200);
this.payload = payload == null ? UserPayload.create(null) : payload;
this.vertexConfig = vertexConfig;
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
index 6bfc5badd..ba1acdf98 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/MapUtils.java
@@ -71,9 +71,11 @@ import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import com.google.common.collect.HashMultimap;
-public class MapUtils {
+public final class MapUtils {
private static final Logger LOG = LoggerFactory.getLogger(MapUtils.class);
+
+ private MapUtils() {}
public static void configureLocalDirs(Configuration conf, String localDir)
throws IOException {
@@ -101,7 +103,7 @@ public class MapUtils {
// JOB_LOCAL_DIR doesn't exist on this host -- Create it.
workDir = lDirAlloc.getLocalPathForWrite("work", conf);
FileSystem lfs = FileSystem.getLocal(conf).getRaw();
- boolean madeDir = false;
+ boolean madeDir;
try {
madeDir = lfs.mkdirs(workDir);
} catch (FileAlreadyExistsException e) {
@@ -127,8 +129,8 @@ public class MapUtils {
LOG.info("Generating data at path: " + file);
// create a file with length entries
@SuppressWarnings("deprecation")
- SequenceFile.Writer writer =
- SequenceFile.createWriter(fs, job, file,
+ SequenceFile.Writer writer =
+ SequenceFile.createWriter(fs, job, file,
LongWritable.class, Text.class);
try {
Random r = new Random(System.currentTimeMillis());
@@ -144,8 +146,8 @@ public class MapUtils {
writer.close();
}
- SequenceFileInputFormat<LongWritable, Text> format =
- new SequenceFileInputFormat<LongWritable, Text>();
+ SequenceFileInputFormat<LongWritable, Text> format =
+ new SequenceFileInputFormat<>();
InputSplit[] splits = format.getSplits(job, 1);
System.err.println("#split = " + splits.length + " ; " +
"#locs = " + splits[0].getLocations().length + "; " +
@@ -175,7 +177,7 @@ public class MapUtils {
String[] locations = split.getLocations();
- SplitMetaInfo info = null;
+ SplitMetaInfo info;
info = new JobSplit.SplitMetaInfo(locations, offset, split.getLength());
Path jobSplitMetaInfoFile = new Path(
@@ -209,7 +211,7 @@ public class MapUtils {
MapProcessor.class.getName()).setUserPayload(
TezUtils.createUserPayloadFromConf(jobConf));
- Token<JobTokenIdentifier> shuffleToken = new Token<JobTokenIdentifier>();
+ Token<JobTokenIdentifier> shuffleToken = new Token<>();
TaskSpec taskSpec = new TaskSpec(
TezTestUtils.getMockTaskAttemptId(0, 0, mapId, 0),
@@ -218,18 +220,18 @@ public class MapUtils {
inputSpecs,
outputSpecs, null, null);
- Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<String, ByteBuffer>();
+ Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>();
String auxiliaryService = jobConf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,
TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT);
serviceConsumerMetadata.put(auxiliaryService,
ShuffleUtils.convertJobTokenToBytes(shuffleToken));
- Map<String, String> envMap = new HashMap<String, String>();
+ Map<String, String> envMap = new HashMap<>();
ByteBuffer shufflePortBb = ByteBuffer.allocate(4).putInt(0, 8000);
AuxiliaryServiceHelper
.setServiceDataIntoEnv(auxiliaryService, shufflePortBb,
envMap);
- LogicalIOProcessorRuntimeTask task = new LogicalIOProcessorRuntimeTask(
+ return new LogicalIOProcessorRuntimeTask(
taskSpec,
0,
jobConf,
@@ -237,8 +239,7 @@ public class MapUtils {
umbilical,
serviceConsumerMetadata,
envMap,
- HashMultimap.<String, String>create(), null, "", new ExecutionContextImpl("localhost"),
+ HashMultimap.create(), null, "", new ExecutionContextImpl("localhost"),
Runtime.getRuntime().maxMemory(), true, new DefaultHadoopShim(), sharedExecutor);
- return task;
}
}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
index 94b50a6c5..7cf1e1af6 100644
--- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
+++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
@@ -42,11 +42,12 @@ import org.slf4j.LoggerFactory;
import java.util.List;
@InterfaceAudience.Private
-public class Utils {
+public final class Utils {
private static final String LOG4J_CONFIGURATION = "log4j.configuration";
private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(Utils.class);
+ private Utils() {}
/**
* Parse tez counters from json
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index b9457a076..49c78eab7 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -60,7 +60,9 @@ import org.apache.tez.dag.records.TezVertexID;
import com.google.common.collect.Lists;
-public class HistoryEventTimelineConversion {
+public final class HistoryEventTimelineConversion {
+
+ private HistoryEventTimelineConversion() {}
private static void validateEvent(HistoryEvent event) {
if (!event.isHistoryEvent()) {
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
index e4285708a..2c9ad8601 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/ProtoConverters.java
@@ -30,7 +30,7 @@ import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.events.EventProtos.VertexManagerEventProto;
-public class ProtoConverters {
+public final class ProtoConverters {
public static EventProtos.CustomProcessorEventProto convertCustomProcessorEventToProto(
CustomProcessorEvent event) {
@@ -168,4 +168,5 @@ public class ProtoConverters {
return event;
}
+ private ProtoConverters() {}
}
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/common/TezConverterUtils.java b/tez-runtime-internals/src/main/java/org/apache/tez/common/TezConverterUtils.java
index 02dc69c3e..933115757 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/common/TezConverterUtils.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/common/TezConverterUtils.java
@@ -28,7 +28,9 @@ import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.internals.api.events.SystemEventProtos.TaskFailureTypeProto;
-public class TezConverterUtils {
+public final class TezConverterUtils {
+
+ private TezConverterUtils() {}
/**
* return a {@link URI} from a given url
@@ -36,7 +38,6 @@ public class TezConverterUtils {
* @param url
* url to convert
* @return path from {@link URL}
- * @throws URISyntaxException
*/
@Private
public static URI getURIFromYarnURL(URL url) throws URISyntaxException {
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
index 626d178ce..b6000ccae 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TaskExecutionTestHelpers.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -49,10 +50,12 @@ import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TaskExecutionTestHelpers {
+public final class TaskExecutionTestHelpers {
public static final String HEARTBEAT_EXCEPTION_STRING = "HeartbeatException";
+ private TaskExecutionTestHelpers() {}
+
// Uses static fields for signaling. Ensure only used by one test at a time.
public static class TestProcessor extends AbstractLogicalIOProcessor {
@@ -184,9 +187,7 @@ public class TaskExecutionTestHelpers {
LOG.info("Await completion");
processorLock.lock();
try {
- if (completed) {
- return;
- } else {
+ if (!completed) {
completionCondition.await();
}
} finally {
@@ -295,14 +296,14 @@ public class TaskExecutionTestHelpers {
private static final Logger LOG = LoggerFactory.getLogger(TezTaskUmbilicalForTest.class);
- private final List<TezEvent> requestEvents = new LinkedList<TezEvent>();
+ private final List<TezEvent> requestEvents = new LinkedList<>();
private final ReentrantLock umbilicalLock = new ReentrantLock();
private final Condition eventCondition = umbilicalLock.newCondition();
private boolean pendingEvent = false;
private boolean eventEnacted = false;
- volatile int getTaskInvocations = 0;
+ private final AtomicInteger taskInvocations = new AtomicInteger(0);
private boolean shouldThrowException = false;
private boolean shouldSendDieSignal = false;
@@ -461,20 +462,20 @@ public class TaskExecutionTestHelpers {
}
@Override
- public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+ public long getProtocolVersion(String protocol, long clientVersion) {
return 0;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion,
- int clientMethodsHash) throws IOException {
+ int clientMethodsHash) {
return null;
}
@Override
public ContainerTask getTask(ContainerContext containerContext) throws IOException {
// Return shouldDie = true
- getTaskInvocations++;
+ taskInvocations.incrementAndGet();
return new ContainerTask(null, true, null, null, false);
}
@@ -511,18 +512,20 @@ public class TaskExecutionTestHelpers {
umbilicalLock.unlock();
}
}
+
+ public int getTaskInvocations() {
+ return taskInvocations.get();
+ }
}
@SuppressWarnings("deprecation")
public static ContainerId createContainerId(ApplicationId appId) {
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
- ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
- return containerId;
+ return ContainerId.newInstance(appAttemptId, 1);
}
public static TaskReporter createTaskReporter(ApplicationId appId, TezTaskUmbilicalForTest umbilical) {
- TaskReporter taskReporter = new TaskReporter(umbilical, 100, 1000, 100, new AtomicLong(0),
+ return new TaskReporter(umbilical, 100, 1000, 100, new AtomicLong(0),
createContainerId(appId).toString());
- return taskReporter;
}
}
diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
index c3c4705c1..a570ab824 100644
--- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
+++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/task/TestContainerExecution.java
@@ -51,7 +51,7 @@ public class TestContainerExecution {
ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
getTaskFuture.get();
- assertEquals(1, umbilical.getTaskInvocations);
+ assertEquals(1, umbilical.getTaskInvocations());
} finally {
executor.shutdownNow();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/common/TezRuntimeFrameworkConfigs.java b/tez-runtime-library/src/main/java/org/apache/tez/common/TezRuntimeFrameworkConfigs.java
index 62bc232b8..61c0fcd31 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/common/TezRuntimeFrameworkConfigs.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/common/TezRuntimeFrameworkConfigs.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
* Configuration parameters which are internal to the Inputs and Outputs which exist in the Runtime
*/
@Private
-public class TezRuntimeFrameworkConfigs {
+public final class TezRuntimeFrameworkConfigs {
private static final String TEZ_RUNTIME_FRAMEWORK_PREFIX = "tez.runtime.framework.";
@@ -41,4 +41,6 @@ public class TezRuntimeFrameworkConfigs {
public static final String TEZ_RUNTIME_METRICS_SESSION_ID = TEZ_RUNTIME_FRAMEWORK_PREFIX +
"metrics.session.id";
public static final String TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT = "";
+
+ private TezRuntimeFrameworkConfigs() {}
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 1c747af2b..de28286d9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -47,23 +47,22 @@ import org.apache.tez.runtime.library.conf.OrderedPartitionedKVOutputConfig.Sort
@Public
@Evolving
@ConfigurationClass(templateFileName = "tez-runtime-default-template.xml")
-public class TezRuntimeConfiguration {
+public final class TezRuntimeConfiguration {
private static final String TEZ_RUNTIME_PREFIX = "tez.runtime.";
- private static final Set<String> tezRuntimeKeys = new HashSet<String>();
- private static Set<String> umnodifiableTezRuntimeKeySet;
- private static final Set<String> otherKeys = new HashSet<String>();
- private static Set<String> unmodifiableOtherKeySet;
- private static Configuration defaultConf = new Configuration(false);
- private static final Map<String, String> tezRuntimeConfMap = new HashMap<String, String>();
- private static final Map<String, String> otherConfMap = new HashMap<String, String>();
+ private static final Set<String> TEZ_RUNTIME_KEYS = new HashSet<>();
+ private static final Set<String> UMNODIFIABLE_TEZ_RUNTIME_KEY_SET;
+ private static final Set<String> OTHER_KEYS = new HashSet<>();
+ private static final Set<String> UNMODIFIABLE_OTHER_KEY_SET;
+ private static final Configuration DEFAULT_CONF = new Configuration(false);
+ private static final Map<String, String> TEZ_RUNTIME_CONF_MAP = new HashMap<>();
+ private static final Map<String, String> OTHER_CONF_MAP = new HashMap<>();
/**
* Prefixes from Hadoop configuration which are allowed.
*/
- private static final List<String> allowedPrefixes = new ArrayList<String>();
- private static List<String> unmodifiableAllowedPrefixes;
+ private static final List<String> ALLOWED_PREFIXES = new ArrayList<>();
static {
@@ -71,6 +70,8 @@ public class TezRuntimeConfiguration {
TezRuntimeConfiguration.TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);
}
+ private TezRuntimeConfiguration() {}
+
/**
* Configuration key to enable/disable IFile readahead.
*/
@@ -250,7 +251,7 @@ public class TezRuntimeConfiguration {
@Private
@Unstable
@ConfigurationProperty(type = "integer")
- /**
+ /*
* Expert setting made available only for debugging. Do not change it. Sets
* the number of retries before giving up on downloading from source
* attempt by consumer. Code internally handles the threshold if set to -1.
@@ -264,7 +265,7 @@ public class TezRuntimeConfiguration {
@Private
@Unstable
@ConfigurationProperty(type = "float")
- /**
+ /*
* Expert setting made available only for debugging. Do not change it. Setting
* to determine if failures happened across a percentage of nodes. This
* helps in determining if the consumer has to be restarted on continuous
@@ -280,7 +281,7 @@ public class TezRuntimeConfiguration {
@Private
@Unstable
@ConfigurationProperty(type = "integer")
- /**
+ /*
* Expert setting made available only for debugging. Do not change it. Setting
* to determine if the consumer has to be restarted on continuous
* failures across nodes. Used along with {@link
@@ -294,7 +295,7 @@ public class TezRuntimeConfiguration {
@Private
@Unstable
@ConfigurationProperty(type = "float")
- /**
+ /*
* Expert setting made available only for debugging. Do not change it.
* Maximum percentage of time (compared to overall progress), the fetcher is
* allowed before concluding that it is stalled.
@@ -307,7 +308,7 @@ public class TezRuntimeConfiguration {
@Private
@Unstable
@ConfigurationProperty(type = "float")
- /**
+ /*
* Expert setting made available only for debugging. Do not change it.
* Fraction to determine whether the shuffle has progressed enough or not
* If it has not progressed enough, it could be qualified for the consumer.
@@ -321,7 +322,7 @@ public class TezRuntimeConfiguration {
@Private
@Unstable
@ConfigurationProperty(type = "float")
- /**
+ /*
* Expert setting made available only for debugging. Do not change it.
* Provides threshold for determining whether fetching has to be marked
* unhealthy based on the ratio of (failures/(failures+completed))
@@ -335,7 +336,7 @@ public class TezRuntimeConfiguration {
@Private
@Unstable
@ConfigurationProperty(type = "boolean")
- /**
+ /*
* Expert setting made available only for debugging. Do not change it.
* Provides threshold for determining whether fetching has to be marked
* unhealthy based on the ratio of (failures/(failures+completed))
@@ -432,7 +433,7 @@ public class TezRuntimeConfiguration {
* "host1#Map_1#100": from host1 for Map 1 source tasks with 100% likelihood
* "host1#Map_1#100#fail_only_first": as above but only for input attempts with index 0
*/
- @ConfigurationProperty(type = "string")
+ @ConfigurationProperty()
public static final String TEZ_RUNTIME_SHUFFLE_FETCH_TESTING_ERRORS_CONFIG =
TEZ_RUNTIME_PREFIX + "shuffle.fetch.testing.errors.config";
public static final String TEZ_RUNTIME_SHUFFLE_FETCH_TESTING_ERRORS_CONFIG_DEFAULT = "*#50";
@@ -608,130 +609,130 @@ public class TezRuntimeConfiguration {
public static final long TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT = 10000;
static {
- tezRuntimeKeys.add(TEZ_RUNTIME_IFILE_READAHEAD);
- tezRuntimeKeys.add(TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
- tezRuntimeKeys.add(TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
- tezRuntimeKeys.add(TEZ_RUNTIME_IO_SORT_FACTOR);
- tezRuntimeKeys.add(TEZ_RUNTIME_SORT_SPILL_PERCENT);
- tezRuntimeKeys.add(TEZ_RUNTIME_IO_SORT_MB);
- tezRuntimeKeys.add(TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
- tezRuntimeKeys.add(TEZ_RUNTIME_COMBINE_MIN_SPILLS);
- tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);
- tezRuntimeKeys.add(
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_IFILE_READAHEAD);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_IO_SORT_FACTOR);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SORT_SPILL_PERCENT);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_IO_SORT_MB);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_INDEX_CACHE_MEMORY_LIMIT_BYTES);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_COMBINE_MIN_SPILLS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_PIPELINED_SORTER_SORT_THREADS);
+ TEZ_RUNTIME_KEYS.add(
TEZ_RUNTIME_PIPELINED_SORTER_MIN_BLOCK_SIZE_IN_MB);
- tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY);
- tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB);
- tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES);
- tezRuntimeKeys.add(TEZ_RUNTIME_PARTITIONER_CLASS);
- tezRuntimeKeys.add(TEZ_RUNTIME_COMBINER_CLASS);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
- tezRuntimeKeys.add
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_PIPELINED_SORTER_LAZY_ALLOCATE_MEMORY);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_UNORDERED_OUTPUT_BUFFER_SIZE_MB);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_PARTITIONER_CLASS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_COMBINER_CLASS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_NOTIFY_READERROR);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_ENABLE_SSL);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_FETCH_VERIFY_DISK_CHECKSUM);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_FETCH_BUFFER_PERCENT);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_MEMORY_LIMIT_PERCENT);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_MERGE_PERCENT);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_MEMTOMEM_SEGMENTS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_ENABLE_MEMTOMEM);
+ TEZ_RUNTIME_KEYS.add
(TEZ_RUNTIME_SHUFFLE_ACCEPTABLE_HOST_FETCH_FAILURE_FRACTION);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_TESTING_ERRORS_CONFIG);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS);
- tezRuntimeKeys.add(TEZ_RUNTIME_REPORT_PARTITION_STATS);
- tezRuntimeKeys.add(TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT);
- tezRuntimeKeys.add(TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
- tezRuntimeKeys.add(TEZ_RUNTIME_INTERNAL_SORTER_CLASS);
- tezRuntimeKeys.add(TEZ_RUNTIME_KEY_COMPARATOR_CLASS);
- tezRuntimeKeys.add(TEZ_RUNTIME_KEY_CLASS);
- tezRuntimeKeys.add(TEZ_RUNTIME_VALUE_CLASS);
- tezRuntimeKeys.add(TEZ_RUNTIME_COMPRESS);
- tezRuntimeKeys.add(TEZ_RUNTIME_COMPRESS_CODEC);
- tezRuntimeKeys.add(TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS);
- tezRuntimeKeys.add(TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
- tezRuntimeKeys.add(TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
- tezRuntimeKeys.add(TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
- tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
- tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
- tezRuntimeKeys.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE);
- tezRuntimeKeys.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS);
- tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
- tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
- tezRuntimeKeys.add(TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
- tezRuntimeKeys.add(TEZ_RUNTIME_SORTER_CLASS);
- tezRuntimeKeys.add(TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
- tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS);
- tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_BATCH_WAIT);
-
- defaultConf.addResource("core-default.xml");
- defaultConf.addResource("core-site.xml");
- defaultConf.addResource("tez-site.xml");
-
- for (Map.Entry<String, String> confEntry : defaultConf) {
- if (tezRuntimeKeys.contains(confEntry.getKey())) {
- tezRuntimeConfMap.put(confEntry.getKey(), confEntry.getValue());
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_MIN_FAILURES_PER_HOST);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_MAX_STALL_TIME_FRACTION);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_SOURCE_ATTEMPT_ABORT_LIMIT);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_MAX_ALLOWED_FAILED_FETCH_ATTEMPT_FRACTION);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_MIN_REQUIRED_PROGRESS_FRACTION);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_FAILED_CHECK_SINCE_LAST_COMPLETION);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_FETCH_TESTING_ERRORS_CONFIG);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_FETCH_ENABLE_TESTING_ERRORS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_REPORT_PARTITION_STATS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_INPUT_POST_MERGE_BUFFER_PERCENT);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_GROUP_COMPARATOR_CLASS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_INTERNAL_SORTER_CLASS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_KEY_COMPARATOR_CLASS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_KEY_CLASS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_VALUE_CLASS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_COMPRESS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_COMPRESS_CODEC);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_EMPTY_PARTITION_INFO_VIA_EVENTS_ENABLED);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_PIPELINED_SHUFFLE_ENABLED);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_OUTPUT);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_ENABLED);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_MAX_SIZE);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_TRANSFER_DATA_VIA_EVENTS_SUPPORT_IN_MEM_FILE);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SORTER_CLASS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_UNORDERED_PARTITIONED_KVWRITER_BUFFER_MERGE_PERCENT);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_FETCHER_USE_SHARED_POOL);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_HOST_PENALTY_TIME_LIMIT_MS);
+ TEZ_RUNTIME_KEYS.add(TEZ_RUNTIME_SHUFFLE_BATCH_WAIT);
+
+ DEFAULT_CONF.addResource("core-default.xml");
+ DEFAULT_CONF.addResource("core-site.xml");
+ DEFAULT_CONF.addResource("tez-site.xml");
+
+ for (Map.Entry<String, String> confEntry : DEFAULT_CONF) {
+ if (TEZ_RUNTIME_KEYS.contains(confEntry.getKey())) {
+ TEZ_RUNTIME_CONF_MAP.put(confEntry.getKey(), confEntry.getValue());
} else {
// TODO TEZ-1232 Filter out parameters from TezConfiguration, and Task specific confs
- otherConfMap.put(confEntry.getKey(), confEntry.getValue());
- otherKeys.add(confEntry.getKey());
+ OTHER_CONF_MAP.put(confEntry.getKey(), confEntry.getValue());
+ OTHER_KEYS.add(confEntry.getKey());
}
}
// Do NOT need all prefixes from the following list. Only specific ones are allowed
// "hadoop.", "hadoop.security", "io.", "fs.", "ipc.", "net.", "file.", "dfs.", "ha.", "s3.", "nfs3.", "rpc.", "ssl."
- allowedPrefixes.add("io.");
- allowedPrefixes.add("file.");
- allowedPrefixes.add("fs.");
- allowedPrefixes.add("ssl.");
-
- umnodifiableTezRuntimeKeySet = Collections.unmodifiableSet(tezRuntimeKeys);
- unmodifiableOtherKeySet = Collections.unmodifiableSet(otherKeys);
- unmodifiableAllowedPrefixes = Collections.unmodifiableList(allowedPrefixes);
+ ALLOWED_PREFIXES.add("io.");
+ ALLOWED_PREFIXES.add("file.");
+ ALLOWED_PREFIXES.add("fs.");
+ ALLOWED_PREFIXES.add("ssl.");
+
+ UMNODIFIABLE_TEZ_RUNTIME_KEY_SET = Collections.unmodifiableSet(TEZ_RUNTIME_KEYS);
+ UNMODIFIABLE_OTHER_KEY_SET = Collections.unmodifiableSet(OTHER_KEYS);
+ List<String> unmodifiableAllowedPrefixes = Collections.unmodifiableList(ALLOWED_PREFIXES);
}
@Private
public static Set<String> getRuntimeConfigKeySet() {
- return umnodifiableTezRuntimeKeySet;
+ return UMNODIFIABLE_TEZ_RUNTIME_KEY_SET;
}
@Private
public static Set<String> getRuntimeAdditionalConfigKeySet() {
- return unmodifiableOtherKeySet;
+ return UNMODIFIABLE_OTHER_KEY_SET;
}
@Private
public static List<String> getAllowedPrefixes() {
- return allowedPrefixes;
+ return ALLOWED_PREFIXES;
}
@Private
public static Map<String, String> getTezRuntimeConfigDefaults() {
- return Collections.unmodifiableMap(tezRuntimeConfMap);
+ return Collections.unmodifiableMap(TEZ_RUNTIME_CONF_MAP);
}
@Private
public static Map<String, String> getOtherConfigDefaults() {
- return Collections.unmodifiableMap(otherConfMap);
+ return Collections.unmodifiableMap(OTHER_CONF_MAP);
}
public enum ReportPartitionStats {
@Deprecated
- /**
+ /*
* Don't report partition stats. It is the same as NONE.
* It is defined to maintain backward compatibility given
* Configuration @link{#TEZ_RUNTIME_REPORT_PARTITION_STATS} used
@@ -740,7 +741,7 @@ public class TezRuntimeConfiguration {
DISABLED("false"),
@Deprecated
- /**
+ /*
* Report partition stats. It is the same as MEMORY_OPTIMIZED.
* It is defined to maintain backward compatibility given
* Configuration @link{#TEZ_RUNTIME_REPORT_PARTITION_STATS} used
@@ -766,7 +767,7 @@ public class TezRuntimeConfiguration {
private final String type;
- private ReportPartitionStats(String type) {
+ ReportPartitionStats(String type) {
this.type = type;
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
index 74e74f278..3bb85e33c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/ConfigUtils.java
@@ -37,7 +37,9 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
@SuppressWarnings({"unchecked", "rawtypes"})
@InterfaceAudience.Private
-public class ConfigUtils {
+public final class ConfigUtils {
+
+ private ConfigUtils() {}
public static Class<? extends CompressionCodec> getIntermediateOutputCompressorClass(
Configuration conf, Class<DefaultCodec> defaultValue) {
@@ -64,31 +66,27 @@ public class ConfigUtils {
}
public static <V> Class<V> getIntermediateOutputValueClass(Configuration conf) {
- Class<V> retv = (Class<V>) conf.getClass(
+ return (Class<V>) conf.getClass(
TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, null,
Object.class);
- return retv;
}
public static <V> Class<V> getIntermediateInputValueClass(Configuration conf) {
- Class<V> retv = (Class<V>) conf.getClass(
+ return (Class<V>) conf.getClass(
TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, null,
Object.class);
- return retv;
}
public static <K> Class<K> getIntermediateOutputKeyClass(Configuration conf) {
- Class<K> retv = (Class<K>) conf.getClass(
+ return (Class<K>) conf.getClass(
TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, null,
Object.class);
- return retv;
}
public static <K> Class<K> getIntermediateInputKeyClass(Configuration conf) {
- Class<K> retv = (Class<K>) conf.getClass(
+ return (Class<K>) conf.getClass(
TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, null,
Object.class);
- return retv;
}
public static <K> RawComparator<K> getIntermediateOutputKeyComparator(Configuration conf) {
@@ -135,7 +133,7 @@ public class ConfigUtils {
public static Map<String, String> extractConfigurationMap(Map<String, String> confMap, Set<String> allowedKeys) {
Objects.requireNonNull(confMap, "ConfMap cannot be null");
Objects.requireNonNull(allowedKeys, "Valid key set cannot be empty");
- Map<String, String> map = new HashMap<String, String>();
+ Map<String, String> map = new HashMap<>();
for (Map.Entry<String, String> entry : confMap.entrySet()) {
if (allowedKeys.contains(entry.getKey())) {
map.put(entry.getKey(), entry.getValue());
@@ -215,11 +213,11 @@ public class ConfigUtils {
private static Map<String, String> extractConfigurationMapInternal(
Iterable<Map.Entry<String, String>> iterable, List<Set<String>> validKeySets, List<String> allowedPrefixes) {
- Set<String> validKeys = new HashSet<String>();
+ Set<String> validKeys = new HashSet<>();
for (Set<String> set : validKeySets) {
validKeys.addAll(set);
}
- Map<String, String> localConfMap = new HashMap<String, String>();
+ Map<String, String> localConfMap = new HashMap<>();
for (Map.Entry<String, String> entry : iterable) {
if (validKeys.contains(entry.getKey())) {
localConfMap.put(entry.getKey(), entry.getValue());
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
index 81921b2e8..e6cf73944 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
@@ -20,7 +20,7 @@ package org.apache.tez.runtime.library.common;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@Private
-public class Constants {
+public final class Constants {
// TODO NEWTEZ Check which of these constants are expecting specific pieces of information which are being removed - like taskAttemptId
@@ -64,4 +64,5 @@ public class Constants {
public static final String TEZ_RUNTIME_TASK_OUTPUT_MANAGER =
"tez.runtime.task.local.output.manager";
+ private Constants() {}
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
index a75925c5f..32e76f4c1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/TezRuntimeUtils.java
@@ -46,7 +46,7 @@ import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
@Private
-public class TezRuntimeUtils {
+public final class TezRuntimeUtils {
private static final Logger LOG = LoggerFactory
.getLogger(TezRuntimeUtils.class);
@@ -55,6 +55,8 @@ public class TezRuntimeUtils {
//ShufflePort by default for ContainerLaunchers
public static final int INVALID_PORT = -1;
+ private TezRuntimeUtils() {}
+
public static String getTaskIdentifier(String vertexName, int taskIndex) {
return String.format("%s_%06d", vertexName, taskIndex);
}
@@ -85,26 +87,17 @@ public class TezRuntimeUtils {
throw new IOException("Unable to load combiner class: " + className);
}
- Combiner combiner = null;
+ Combiner combiner;
- Constructor<? extends Combiner> ctor;
- try {
- ctor = clazz.getConstructor(TaskContext.class);
- combiner = ctor.newInstance(taskContext);
- } catch (SecurityException e) {
- throw new IOException(e);
- } catch (NoSuchMethodException e) {
- throw new IOException(e);
- } catch (IllegalArgumentException e) {
- throw new IOException(e);
- } catch (InstantiationException e) {
- throw new IOException(e);
- } catch (IllegalAccessException e) {
- throw new IOException(e);
- } catch (InvocationTargetException e) {
- throw new IOException(e);
- }
- return combiner;
+ Constructor<? extends Combiner> ctor;
+ try {
+ ctor = clazz.getConstructor(TaskContext.class);
+ combiner = ctor.newInstance(taskContext);
+ } catch (SecurityException | NoSuchMethodException | IllegalArgumentException | InstantiationException
+ | IllegalAccessException | InvocationTargetException e) {
+ throw new IOException(e);
+ }
+ return combiner;
}
@SuppressWarnings("unchecked")
@@ -123,31 +116,22 @@ public class TezRuntimeUtils {
LOG.debug("Using partitioner class: " + clazz.getName());
}
- Partitioner partitioner = null;
+ Partitioner partitioner;
try {
Constructor<? extends Partitioner> ctorWithConf = clazz
.getConstructor(Configuration.class);
partitioner = ctorWithConf.newInstance(conf);
- } catch (SecurityException e) {
+ } catch (SecurityException | IllegalArgumentException | InstantiationException | IllegalAccessException
+ | InvocationTargetException e) {
throw new IOException(e);
} catch (NoSuchMethodException e) {
try {
// Try a 0 argument constructor.
partitioner = clazz.newInstance();
- } catch (InstantiationException e1) {
- throw new IOException(e1);
- } catch (IllegalAccessException e1) {
+ } catch (InstantiationException | IllegalAccessException e1) {
throw new IOException(e1);
}
- } catch (IllegalArgumentException e) {
- throw new IOException(e);
- } catch (InstantiationException e) {
- throw new IOException(e);
- } catch (IllegalAccessException e) {
- throw new IOException(e);
- } catch (InvocationTargetException e) {
- throw new IOException(e);
}
return partitioner;
}
@@ -158,10 +142,9 @@ public class TezRuntimeUtils {
try {
Constructor<?> ctor = clazz.getConstructor(Configuration.class, String.class, int.class);
ctor.setAccessible(true);
- TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf,
+ return (TezTaskOutput) ctor.newInstance(conf,
outputContext.getUniqueIdentifier(),
outputContext.getDagIdentifier());
- return instance;
} catch (Exception e) {
throw new TezUncheckedException(
"Unable to instantiate configured TezOutputFileManager: "
@@ -183,7 +166,7 @@ public class TezRuntimeUtils {
sb.append("&job=");
sb.append(appId.replace("application", "job"));
sb.append("&dag=");
- sb.append(String.valueOf(dagIdentifier));
+ sb.append(dagIdentifier);
return new URL(sb.toString());
}
@@ -200,9 +183,9 @@ public class TezRuntimeUtils {
sb.append("&job=");
sb.append(appId.replace("application", "job"));
sb.append("&dag=");
- sb.append(String.valueOf(dagIdentifier));
+ sb.append(dagIdentifier);
sb.append("&vertex=");
- sb.append(String.valueOf(vertexIndentifier));
+ sb.append(vertexIndentifier);
return new URL(sb.toString());
}
@@ -219,9 +202,9 @@ public class TezRuntimeUtils {
sb.append("&job=");
sb.append(appId.replace("application", "job"));
sb.append("&dag=");
- sb.append(String.valueOf(dagIdentifier));
+ sb.append(dagIdentifier);
sb.append("&map=");
- sb.append(String.valueOf(taskAttemptIdentifier));
+ sb.append(taskAttemptIdentifier);
return new URL(sb.toString());
}
@@ -269,10 +252,9 @@ public class TezRuntimeUtils {
}
}
- HttpConnectionParams httpConnParams = new HttpConnectionParams(keepAlive,
+ return new HttpConnectionParams(keepAlive,
keepAliveMaxConnections, connectionTimeout, readTimeout, bufferSize, sslShuffle,
sslFactory);
- return httpConnParams;
}
public static BaseHttpConnection getHttpConnection(boolean asyncHttp, URL url,
@@ -288,13 +270,9 @@ public class TezRuntimeUtils {
public static int deserializeShuffleProviderMetaData(ByteBuffer meta)
throws IOException {
- DataInputByteBuffer in = new DataInputByteBuffer();
- try {
+ try (DataInputByteBuffer in = new DataInputByteBuffer()) {
in.reset(meta);
- int port = in.readInt();
- return port;
- } finally {
- in.close();
+ return in.readInt();
}
}
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
index 67b8de27c..19fbf2d08 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/security/SecureShuffleUtils.java
@@ -37,13 +37,14 @@ import org.apache.tez.common.security.JobTokenSecretManager;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class SecureShuffleUtils {
+public final class SecureShuffleUtils {
public static final String HTTP_HEADER_URL_HASH = "UrlHash";
public static final String HTTP_HEADER_REPLY_URL_HASH = "ReplyHash";
+
+ private SecureShuffleUtils() {}
/**
* Base64 encoded hash of msg
- * @param msg
*/
public static String generateHash(byte[] msg, SecretKey key) {
return new String(Base64.encodeBase64(generateByteHash(msg, key)), Charsets.UTF_8);
@@ -51,7 +52,7 @@ public class SecureShuffleUtils {
/**
* calculate hash of msg
- * @param msg
+ *
* @return byte array containing computed hash of message
*/
private static byte[] generateByteHash(byte[] msg, SecretKey key) {
@@ -63,9 +64,6 @@ public class SecureShuffleUtils {
* This is only meant to be used when a process needs to verify against multiple different keys
* (ShuffleHandler for instance)
*
- * @param hash
- * @param msg
- * @param key
* @return true when hashes match; false otherwise
*/
private static boolean verifyHash(byte[] hash, byte[] msg, SecretKey key) {
@@ -75,9 +73,7 @@ public class SecureShuffleUtils {
/**
* verify that hash equals to HMacHash(msg)
- * @param hash
- * @param msg
- * @param mgr JobTokenSecretManager
+ *
* @return true when hashes match; false otherwise
*/
private static boolean verifyHash(byte[] hash, byte[] msg, JobTokenSecretManager mgr) {
@@ -87,14 +83,10 @@ public class SecureShuffleUtils {
/**
* Aux util to calculate hash of a String
- * @param enc_str
- * @param mgr JobTokenSecretManager
- * @return Base64 encodedHash
- * @throws IOException
+ *
*/
- public static String hashFromString(String enc_str, JobTokenSecretManager mgr)
- throws IOException {
- return new String(Base64.encodeBase64(mgr.computeHash(enc_str.getBytes(Charsets.UTF_8))), Charsets.UTF_8);
+ public static String hashFromString(String encStr, JobTokenSecretManager mgr) {
+ return new String(Base64.encodeBase64(mgr.computeHash(encStr.getBytes(Charsets.UTF_8))), Charsets.UTF_8);
}
/**
@@ -106,13 +98,12 @@ public class SecureShuffleUtils {
* @param base64Hash base64 encoded hash
* @param msg the message
* @param key the key to use to generate the hash from the message
- * @throws IOException
*/
public static void verifyReply(String base64Hash, String msg, SecretKey key) throws IOException {
byte[] hash = Base64.decodeBase64(base64Hash.getBytes(Charsets.UTF_8));
boolean res = verifyHash(hash, msg.getBytes(Charsets.UTF_8), key);
- if(res != true) {
+ if(!res) {
throw new IOException("Verification of the hashReply failed");
}
}
@@ -120,7 +111,7 @@ public class SecureShuffleUtils {
/**
* verify that base64Hash is same as HMacHash(msg)
* @param base64Hash (Base64 encoded hash)
- * @param msg
+ * @param msg the message
* @throws IOException if not the same
*/
public static void verifyReply(String base64Hash, String msg, JobTokenSecretManager mgr)
@@ -129,14 +120,14 @@ public class SecureShuffleUtils {
boolean res = verifyHash(hash, msg.getBytes(Charsets.UTF_8), mgr);
- if(res != true) {
+ if(!res) {
throw new IOException("Verification of the hashReply failed");
}
}
/**
* Shuffle specific utils - build string for encoding from URL
- * @param url
+ *
* @return string for encoding
*/
public static String buildMsgFrom(URL url) {
@@ -145,11 +136,10 @@ public class SecureShuffleUtils {
/**
* Shuffle specific utils - build string for encoding from URL
- * @param uri_path
- * @param uri_query
+ *
* @return string for encoding
*/
private static String buildMsgFrom(String uri_path, String uri_query, int port) {
- return String.valueOf(port) + uri_path + "?" + uri_query;
+ return port + uri_path + "?" + uri_query;
}
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index ca8b84724..c5fb65116 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -72,7 +72,7 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovementEventPayloadProto;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DetailedPartitionStatsProto;
-public class ShuffleUtils {
+public final class ShuffleUtils {
private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class);
private static final long MB = 1024l * 1024l;
@@ -94,6 +94,8 @@ public class ShuffleUtils {
}
};
+ private ShuffleUtils() {}
+
public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
throws IOException {
DataInputByteBuffer in = new DataInputByteBuffer();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
index 8f673185b..671a426b6 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/IFile.java
@@ -62,7 +62,7 @@ import org.apache.tez.common.counters.TezCounter;
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class IFile {
+public final class IFile {
private static final Logger LOG = LoggerFactory.getLogger(IFile.class);
public static final int EOF_MARKER = -1; // End of File Marker
public static final int RLE_MARKER = -2; // Repeat same key marker
@@ -74,6 +74,8 @@ public class IFile {
private static final String INCOMPLETE_READ = "Requested to read %d got %d";
private static final String REQ_BUFFER_SIZE_TOO_LARGE = "Size of data %d is greater than the max allowed of %d";
+ private IFile() {}
+
/**
* IFileWriter which stores data in memory for specified limit, beyond
* which it falls back to file based writer. It creates files lazily on
@@ -91,34 +93,22 @@ public class IFile {
*/
public static class FileBackedInMemIFileWriter extends Writer {
- private FileSystem fs;
+ private final FileSystem fs;
private boolean bufferFull;
// For lazy creation of file
- private TezTaskOutput taskOutput;
+ private final TezTaskOutput taskOutput;
private int totalSize;
private Path outputPath;
- private CompressionCodec fileCodec;
- private BoundedByteArrayOutputStream cacheStream;
+ private final CompressionCodec fileCodec;
+ private final BoundedByteArrayOutputStream cacheStream;
private static final int checksumSize = IFileOutputStream.getCheckSumSize();
/**
* Note that we do not allow compression in in-mem stream.
* When spilled over to file, compression gets enabled.
- *
- * @param keySerialization
- * @param valSerialization
- * @param fs
- * @param taskOutput
- * @param keyClass
- * @param valueClass
- * @param codec
- * @param writesCounter
- * @param serializedBytesCounter
- * @param cacheSize
- * @throws IOException
*/
public FileBackedInMemIFileWriter(Serialization<?> keySerialization,
Serialization<?> valSerialization, FileSystem fs, TezTaskOutput taskOutput,
@@ -151,7 +141,6 @@ public class IFile {
/**
* Create in mem stream. In it is too small, adjust it's size
*
- * @param size
* @return in memory stream
*/
public static BoundedByteArrayOutputStream createBoundedBuffer(int size) {
@@ -168,8 +157,6 @@ public class IFile {
* out.
* 3. Create relevant file based writer.
* 4. Write header and then real data.
- *
- * @throws IOException
*/
private void resetToFileBasedWriter() throws IOException {
// Close out stream, so that data checksums are written.
@@ -409,7 +396,7 @@ public class IFile {
// Write EOF_MARKER for key/value length
WritableUtils.writeVInt(out, EOF_MARKER);
WritableUtils.writeVInt(out, EOF_MARKER);
- decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
+ decompressedBytesWritten += 2L * WritableUtils.getVIntSize(EOF_MARKER);
//account for header bytes
decompressedBytesWritten += HEADER.length;
@@ -451,10 +438,6 @@ public class IFile {
* one, send IFile.REPEAT_KEY as key parameter. Should not call this method with
* IFile.REPEAT_KEY as the first key. It is caller's responsibility to ensure that correct
* key/value type checks and key/value length (non-negative) checks are done properly.
- *
- * @param key
- * @param value
- * @throws IOException
*/
public void append(Object key, Object value) throws IOException {
int keyLength = 0;
@@ -493,9 +476,6 @@ public class IFile {
/**
* Appends the value to previous key. Assumes that the caller has already done relevant checks
* for identical keys. Also, no validations are done in this method
- *
- * @param value
- * @throws IOException
*/
public void appendValue(Object value) throws IOException {
valueSerializer.serialize(value);
@@ -511,9 +491,6 @@ public class IFile {
* for identical keys. Also, no validations are done in this method. It is caller's responsibility
* to pass non-negative key/value lengths. Otherwise,IndexOutOfBoundsException could be
* thrown at runtime.
- *
- * @param value
- * @throws IOException
*/
public void appendValue(DataInputBuffer value) throws IOException {
int valueLength = value.getLength() - value.getPosition();
@@ -527,9 +504,6 @@ public class IFile {
/**
* Appends the value to previous key. Assumes that the caller has already done relevant checks
* for identical keys. Also, no validations are done in this method
- *
- * @param valuesItr
- * @throws IOException
*/
public <V> void appendValues(Iterator<V> valuesItr) throws IOException {
while(valuesItr.hasNext()) {
@@ -539,12 +513,6 @@ public class IFile {
/**
* Append key and its associated set of values.
- *
- * @param key
- * @param valuesItr
- * @param <K>
- * @param <V>
- * @throws IOException
*/
public <K, V> void appendKeyValues(K key, Iterator<V> valuesItr) throws IOException {
if (valuesItr.hasNext()) {
@@ -561,11 +529,6 @@ public class IFile {
* one, send IFile.REPEAT_KEY as key parameter. Should not call this method with
* IFile.REPEAT_KEY as the first key. It is caller's responsibility to pass non-negative
* key/value lengths. Otherwise,IndexOutOfBoundsException could be thrown at runtime.
- *
- *
- * @param key
- * @param value
- * @throws IOException
*/
public void append(DataInputBuffer key, DataInputBuffer value) throws IOException {
int keyLength = key.getLength() - key.getPosition();
@@ -623,7 +586,7 @@ public class IFile {
}
protected void writeRLE(DataOutputStream out) throws IOException {
- /**
+ /*
* To strike a balance between 2 use cases (lots of unique KV in stream
* vs lots of identical KV in stream), we start off by writing KV pair.
* If subsequent KV is identical, we write RLE marker along with V_END_MARKER
@@ -638,7 +601,7 @@ public class IFile {
}
protected void writeValueMarker(DataOutputStream out) throws IOException {
- /**
+ /*
* Write V_END_MARKER only in RLE scenario. This will
* save space in conditions where lots of unique KV pairs are found in the
* stream.
@@ -706,7 +669,7 @@ public class IFile {
protected int recNo = 1;
protected int originalKeyLength;
protected int prevKeyLength;
- byte keyBytes[] = new byte[0];
+ private byte[] keyBytes = new byte[0];
protected int currentKeyLength;
protected int currentValueLength;
@@ -720,7 +683,6 @@ public class IFile {
* checksum bytes for the data at the end of the file.
* @param codec codec
* @param readsCounter Counter for records read from disk
- * @throws IOException
*/
public Reader(FileSystem fs, Path file,
CompressionCodec codec,
@@ -739,7 +701,6 @@ public class IFile {
* bytes.
* @param codec codec
* @param readsCounter Counter for records read from disk
- * @throws IOException
*/
public Reader(InputStream in, long length,
CompressionCodec codec,
@@ -748,7 +709,7 @@ public class IFile {
int bufferSize) throws IOException {
this(in, ((in != null) ? (length - HEADER.length) : length), codec,
readsCounter, bytesReadCounter, readAhead, readAheadLength,
- bufferSize, ((in != null) ? isCompressedFlagEnabled(in) : false));
+ bufferSize, (in != null && isCompressedFlagEnabled(in)));
if (in != null && bytesReadCounter != null) {
bytesReadCounter.increment(IFile.HEADER.length);
}
@@ -762,7 +723,6 @@ public class IFile {
* bytes.
* @param codec codec
* @param readsCounter Counter for records read from disk
- * @throws IOException
*/
public Reader(InputStream in, long length,
CompressionCodec codec,
@@ -799,14 +759,6 @@ public class IFile {
/**
* Read entire ifile content to memory.
- *
- * @param buffer
- * @param in
- * @param compressedLength
- * @param codec
- * @param ifileReadAhead
- * @param ifileReadAheadLength
- * @throws IOException
*/
public static void readToMemory(byte[] buffer, InputStream in, int compressedLength,
CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength)
@@ -825,7 +777,6 @@ public class IFile {
compressedLength);
} else {
LOG.warn("Could not obtain decompressor from CodecPool");
- in = checksumIn;
}
}
try {
@@ -863,7 +814,6 @@ public class IFile {
* @param in the input stream containing the IFile data
* @param length the amount of data to read from the input
* @return the number of bytes copied
- * @throws IOException
*/
public static long readToDisk(OutputStream out, InputStream in, long length,
boolean ifileReadAhead, int ifileReadAheadLength)
@@ -908,7 +858,6 @@ public class IFile {
* @param off offset
* @param len length of buffer
* @return the no. of bytes read
- * @throws IOException
*/
private int readData(byte[] buf, int off, int len) throws IOException {
int bytesRead = 0;
@@ -949,7 +898,6 @@ public class IFile {
* @param dIn
* @return true if key length and value length were set to the next
* false if end of file (EOF) marker was reached
- * @throws IOException
*/
protected boolean positionToNextRecord(DataInput dIn) throws IOException {
// Sanity check
@@ -1087,9 +1035,7 @@ public class IFile {
}
}
- public void reset(int offset) {
- return;
- }
+ public void reset(int offset) {}
public void disableChecksumValidation() {
checksumIn.disableChecksumValidation();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 12e573519..d22988533 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -61,14 +61,16 @@ import org.apache.tez.runtime.library.utils.LocalProgress;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@SuppressWarnings({"unchecked", "rawtypes"})
-public class TezMerger {
+public final class TezMerger {
private static final Logger LOG = LoggerFactory.getLogger(TezMerger.class);
// Local directories
- private static LocalDirAllocator lDirAlloc =
+ private static final LocalDirAllocator L_DIR_ALLOC =
new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+ private TezMerger() {}
+
public static
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
SerializationContext serializationContext,
@@ -110,7 +112,7 @@ public class TezMerger {
mergePhase);
}
- public static <K extends Object, V extends Object>
+ public static <K, V>
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
SerializationContext serializationContext,
List<Segment> segments,
@@ -144,7 +146,7 @@ public class TezMerger {
readsCounter, writesCounter, bytesReadCounter, mergePhase);
}
- public static <K extends Object, V extends Object>
+ public static <K, V>
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
SerializationContext serializationContext,
CompressionCodec codec,
@@ -167,7 +169,7 @@ public class TezMerger {
mergePhase);
}
- public static <K extends Object, V extends Object>
+ public static <K, V>
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
SerializationContext serializationContext,
CompressionCodec codec,
@@ -189,7 +191,7 @@ public class TezMerger {
mergePhase);
}
- public static <K extends Object, V extends Object>
+ public static <K, V>
TezRawKeyValueIterator merge(Configuration conf, FileSystem fs,
SerializationContext serializationContext,
CompressionCodec codec,
@@ -211,7 +213,7 @@ public class TezMerger {
mergePhase);
}
- public static <K extends Object, V extends Object>
+ public static <K, V>
void writeFile(TezRawKeyValueIterator records, Writer writer,
Progressable progressable, long recordsBeforeProgress)
throws IOException, InterruptedException {
@@ -228,7 +230,7 @@ public class TezMerger {
if (((recordCtr++) % recordsBeforeProgress) == 0) {
progressable.progress();
if (Thread.currentThread().isInterrupted()) {
- /**
+ /*
* Takes care DefaultSorter.mergeParts, MergeManager's merger threads,
* PipelinedSorter's flush(). This is not expensive check as it is carried out every
* 10000 records or so.
@@ -250,7 +252,7 @@ public class TezMerger {
private int position;
private int length;
- public KeyValueBuffer(byte buf[], int position, int length) {
+ KeyValueBuffer(byte[] buf, int position, int length) {
reset(buf, position, length);
}
@@ -279,7 +281,7 @@ public class TezMerger {
static final byte[] EMPTY_BYTES = new byte[0];
Reader reader = null;
final KeyValueBuffer key = new KeyValueBuffer(EMPTY_BYTES, 0, 0);
- TezCounter mapOutputsCounter = null;
+ private TezCounter mapOutputsCounter;
public Segment(Reader reader, TezCounter mapOutputsCounter) {
this.reader = reader;
@@ -358,15 +360,17 @@ public class TezMerger {
@InterfaceStability.Unstable
public static class DiskSegment extends Segment {
- FileSystem fs = null;
- Path file = null;
- boolean preserve = false; // Signifies whether the segment should be kept after a merge is complete. Checked in the close method.
- CompressionCodec codec = null;
- long segmentOffset = 0;
- long segmentLength = -1;
+ private FileSystem fs;
+ private Path file;
+
+ // Signifies whether the segment should be kept after a merge is complete. Checked in the close method.
+ private boolean preserve;
+ private CompressionCodec codec;
+ private long segmentOffset;
+ private long segmentLength;
boolean ifileReadAhead;
int ifileReadAheadLength;
- int bufferSize = -1;
+ private int bufferSize;
public DiskSegment(FileSystem fs, Path file,
CompressionCodec codec, boolean ifileReadAhead,
@@ -389,7 +393,7 @@ public class TezMerger {
long segmentOffset, long segmentLength,
CompressionCodec codec, boolean ifileReadAhead,
int ifileReadAheadLength, int bufferSize,
- boolean preserve) throws IOException {
+ boolean preserve) {
this(fs, file, segmentOffset, segmentLength, codec, ifileReadAhead,
ifileReadAheadLength, bufferSize, preserve, null);
}
@@ -397,8 +401,7 @@ public class TezMerger {
public DiskSegment(FileSystem fs, Path file,
long segmentOffset, long segmentLength, CompressionCodec codec,
boolean ifileReadAhead, int ifileReadAheadLength, int bufferSize,
- boolean preserve, TezCounter mergedMapOutputsCounter)
- throws IOException {
+ boolean preserve, TezCounter mergedMapOutputsCounter) {
super(null, mergedMapOutputsCounter);
this.fs = fs;
this.file = file;
@@ -460,7 +463,7 @@ public class TezMerger {
}
@VisibleForTesting
- static class MergeQueue<K extends Object, V extends Object>
+ static class MergeQueue<K, V>
extends PriorityQueue<Segment> implements TezRawKeyValueIterator {
final Configuration conf;
final FileSystem fs;
@@ -471,7 +474,7 @@ public class TezMerger {
static final int ifileBufferSize = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_BUFFER_SIZE_DEFAULT;
static final long recordsBeforeProgress = TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT;
- List<Segment> segments = new ArrayList<Segment>();
+ private List<Segment> segments = new ArrayList<>();
final RawComparator comparator;
@@ -537,7 +540,7 @@ public class TezMerger {
}
// Sort segments on file-lengths
- Collections.sort(segments, segmentComparator);
+ segments.sort(segmentComparator);
}
public MergeQueue(Configuration conf, FileSystem fs,
@@ -566,7 +569,7 @@ public class TezMerger {
this.reporter = reporter;
this.considerFinalMergeForProgress = considerFinalMergeForProgress;
if (sortSegments) {
- Collections.sort(segments, segmentComparator);
+ segments.sort(segmentComparator);
}
this.checkForSameKeys = checkForSameKeys;
this.codec = codec;
@@ -596,7 +599,7 @@ public class TezMerger {
long startPos = reader.getPosition();
if (checkForSameKeys) {
if (hasNext == null) {
- /**
+ /*
* hasNext can be null during first iteration & prevKey is initialized here.
* In cases of NO_KEY/NEW_KEY, we readjust the queue later. If new segment/file is found
* during this process, we need to compare keys for RLE across segment boundaries.
@@ -606,7 +609,7 @@ public class TezMerger {
} else {
//indicates a key has been read already
if (hasNext != KeyState.SAME_KEY) {
- /**
+ /*
* Store previous key before reading next for later key comparisons.
* If all keys in a segment are unique, it would always hit this code path and key copies
* are wasteful in such condition, as these comparisons are mainly done for RLE.
@@ -635,9 +638,6 @@ public class TezMerger {
/**
* Check if the previous key is same as the next top segment's key.
* This would be useful to compute whether same key is spread across multiple segments.
- *
- * @param current
- * @throws IOException
*/
void compareKeyWithNextTopKey(Segment current) throws IOException {
Segment nextTop = top();
@@ -759,7 +759,7 @@ public class TezMerger {
factor += inMem;
}
List<Segment> segmentsToMerge =
- new ArrayList<Segment>();
+ new ArrayList<>();
int segmentsConsidered = 0;
int numSegmentsToConsider = factor;
long startBytes = 0; // starting bytes of segments of this merge
@@ -816,8 +816,8 @@ public class TezMerger {
// the 3rd phase of reduce task.
totalBytesProcessed = 0;
totalBytes = 0;
- for (int i = 0; i < segmentsToMerge.size(); i++) {
- totalBytes += segmentsToMerge.get(i).getLength();
+ for (Segment segment : segmentsToMerge) {
+ totalBytes += segment.getLength();
}
}
if (totalBytes != 0) //being paranoid
@@ -860,7 +860,7 @@ public class TezMerger {
Path tmpFilename =
new Path(tmpDir, "intermediate").suffix("." + passNo);
- Path outputFile = lDirAlloc.getLocalPathForWrite(
+ Path outputFile = L_DIR_ALLOC.getLocalPathForWrite(
tmpFilename.toString(),
approxOutputSize, conf);
@@ -933,7 +933,7 @@ public class TezMerger {
*/
private List<Segment> getSegmentDescriptors(int numDescriptors) {
if (numDescriptors > segments.size()) {
- List<Segment> subList = new ArrayList<Segment>(segments);
+ List<Segment> subList = new ArrayList<>(segments);
segments.clear();
return subList;
}
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/Utils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/Utils.java
index c1b44a29d..768ac6e8c 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/Utils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/conf/Utils.java
@@ -22,7 +22,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.tez.dag.api.EdgeProperty;
@Private
-class Utils {
+final class Utils {
+
+ private Utils() {}
/**
* Modify the EdgeProperty to set the history text if available
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/BufferUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/BufferUtils.java
index a1685edcf..9f31a09fa 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/BufferUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/BufferUtils.java
@@ -25,7 +25,10 @@ import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
@Private
-public class BufferUtils {
+public final class BufferUtils {
+
+ private BufferUtils() {}
+
public static int compare(DataInputBuffer buf1, DataInputBuffer buf2) {
byte[] b1 = buf1.getData();
byte[] b2 = buf2.getData();
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/FastByteComparisons.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/FastByteComparisons.java
index 4bd255268..3743c2a2f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/FastByteComparisons.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/utils/FastByteComparisons.java
@@ -1,6 +1,6 @@
package org.apache.tez.runtime.library.utils;
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -37,6 +37,8 @@ import com.google.common.primitives.UnsignedBytes;
*/
final class FastByteComparisons {
+ private FastByteComparisons() {}
+
/**
* Lexicographically compare two byte arrays.
*/
@@ -48,8 +50,8 @@ final class FastByteComparisons {
private interface Comparer<T> {
- abstract public int compareTo(T buffer1, int offset1, int length1,
- T buffer2, int offset2, int length2);
+ int compareTo(T buffer1, int offset1, int length1,
+ T buffer2, int offset2, int length2);
}
private static Comparer<byte[]> lexicographicalComparerJavaImpl() {
@@ -131,12 +133,10 @@ final class FastByteComparisons {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
return f.get(null);
- } catch (NoSuchFieldException e) {
+ } catch (NoSuchFieldException | IllegalAccessException e) {
// It doesn't matter what we throw;
// it's swallowed in getBestComparer().
throw new Error();
- } catch (IllegalAccessException e) {
- throw new Error();
}
}
});
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
index c6574d74f..6512853c5 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
@@ -59,6 +59,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -66,24 +67,24 @@ public class TestTezMerger {
private static final Logger LOG = LoggerFactory.getLogger(TestTezMerger.class);
- private static Configuration defaultConf = new Configuration();
- private static FileSystem localFs = null;
- private static Path workDir = null;
- private static RawComparator comparator = null;
- private static Random rnd = new Random();
+ private static final Configuration DEFAULT_CONF = new Configuration();
+ private static FileSystem localFs;
+ private static Path workDir;
+ private static RawComparator comparator;
+ private static final Random RND = new Random();
private static final String SAME_KEY = "SAME_KEY";
private static final String DIFF_KEY = "DIFF_KEY";
//store the generated data for final verification
- private static ListMultimap<Integer, Long> verificationDataSet = LinkedListMultimap.create();
+ private static final ListMultimap<Integer, Long> VERIFICATION_DATA_SET = LinkedListMultimap.create();
- private MergeManager merger = mock(MergeManager.class);
+ private final MergeManager merger = mock(MergeManager.class);
static {
- defaultConf.set("fs.defaultFS", "file:///");
+ DEFAULT_CONF.set("fs.defaultFS", "file:///");
try {
- localFs = FileSystem.getLocal(defaultConf);
+ localFs = FileSystem.getLocal(DEFAULT_CONF);
workDir = new Path(
new Path(System.getProperty("test.build.data", "/tmp")), TestTezMerger.class.getName())
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
@@ -91,12 +92,12 @@ public class TestTezMerger {
} catch (IOException e) {
throw new RuntimeException(e);
}
- defaultConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
- defaultConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, LongWritable.class.getName());
+ DEFAULT_CONF.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
+ DEFAULT_CONF.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, LongWritable.class.getName());
Path baseDir = new Path(workDir, TestMergeManager.class.getName());
String localDirs = baseDir.toString();
- defaultConf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
- comparator = ConfigUtils.getIntermediateInputKeyComparator(defaultConf);
+ DEFAULT_CONF.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, localDirs);
+ comparator = ConfigUtils.getIntermediateInputKeyComparator(DEFAULT_CONF);
}
@AfterClass
@@ -106,7 +107,7 @@ public class TestTezMerger {
@Test(timeout = 80000)
public void testMerge() throws Exception {
- /**
+ /*
* test with number of files, keys per file and mergefactor
*/
@@ -128,11 +129,10 @@ public class TestTezMerger {
merge(5, 1000, 100);
//Create random mix of files (empty files + files with keys)
- List<Path> pathList = new LinkedList<Path>();
- pathList.clear();
- pathList.addAll(createIFiles(Math.max(2, rnd.nextInt(20)), 0));
- pathList.addAll(createIFiles(Math.max(2, rnd.nextInt(20)), Math.max(2, rnd.nextInt(10))));
- merge(pathList, Math.max(2, rnd.nextInt(10)));
+ List<Path> pathList = new LinkedList<>();
+ pathList.addAll(createIFiles(Math.max(2, RND.nextInt(20)), 0));
+ pathList.addAll(createIFiles(Math.max(2, RND.nextInt(20)), Math.max(2, RND.nextInt(10))));
+ merge(pathList, Math.max(2, RND.nextInt(10)));
}
private Path createIFileWithTextData(List<String> data) throws IOException {
@@ -150,10 +150,6 @@ public class TestTezMerger {
/**
* Verify if the records are as per the expected data set
- *
- * @param records
- * @param expectedResult
- * @throws IOException
*/
private void verify(TezRawKeyValueIterator records, String[][] expectedResult)
throws IOException {
@@ -168,7 +164,7 @@ public class TestTezMerger {
Text v = new Text();
v.readFields(value);
- assertTrue(k.toString().equals(expectedResult[i][0]));
+ assertEquals(k.toString(), expectedResult[i][0]);
String correctResult = expectedResult[i][1];
@@ -186,7 +182,7 @@ public class TestTezMerger {
@Test(timeout = 5000)
public void testWithCustomComparator_WithEmptyStrings() throws Exception {
- List<Path> pathList = new LinkedList<Path>();
+ List<Path> pathList = new LinkedList<>();
List<String> data = Lists.newLinkedList();
//Merge datasets with custom comparator
RawComparator rc = new CustomComparator();
@@ -236,7 +232,7 @@ public class TestTezMerger {
@Test(timeout = 5000)
public void testWithCustomComparator_No_RLE() throws Exception {
- List<Path> pathList = new LinkedList<Path>();
+ List<Path> pathList = new LinkedList<>();
List<String> data = Lists.newLinkedList();
//Merge datasets with custom comparator
RawComparator rc = new CustomComparator();
@@ -285,14 +281,13 @@ public class TestTezMerger {
@Test(timeout = 5000)
public void testWithCustomComparator_RLE_acrossFiles() throws Exception {
- List<Path> pathList = new LinkedList<Path>();
+ List<Path> pathList = new LinkedList<>();
List<String> data = Lists.newLinkedList();
LOG.info("Test with custom comparator with RLE spanning across segment boundaries");
//Test with 2 files, where the RLE keys can span across files
//First file
- data.clear();
data.add("0");
data.add("0");
pathList.add(createIFileWithTextData(data));
@@ -325,14 +320,13 @@ public class TestTezMerger {
@Test(timeout = 5000)
public void testWithCustomComparator_mixedFiles() throws Exception {
- List<Path> pathList = new LinkedList<Path>();
+ List<Path> pathList = new LinkedList<>();
List<String> data = Lists.newLinkedList();
LOG.info("Test with custom comparator with mixed set of segments (empty, non-empty etc)");
//Test with 2 files, where the RLE keys can span across files
//First file
- data.clear();
data.add("0");
pathList.add(createIFileWithTextData(data));
@@ -374,7 +368,7 @@ public class TestTezMerger {
@Test(timeout = 5000)
public void testWithCustomComparator_RLE() throws Exception {
- List<Path> pathList = new LinkedList<Path>();
+ List<Path> pathList = new LinkedList<>();
List<String> data = Lists.newLinkedList();
LOG.info("Test with custom comparator 2 files one containing RLE and also other segment "
@@ -382,7 +376,6 @@ public class TestTezMerger {
//Test with 2 files, same keys in middle of file
//First file
- data.clear();
data.add("1");
data.add("2");
data.add("2");
@@ -413,7 +406,7 @@ public class TestTezMerger {
@Test(timeout = 5000)
public void testWithCustomComparator_RLE2() throws Exception {
- List<Path> pathList = new LinkedList<Path>();
+ List<Path> pathList = new LinkedList<>();
List<String> data = Lists.newLinkedList();
LOG.info(
@@ -421,7 +414,6 @@ public class TestTezMerger {
//Test with 3 files, same keys in middle of file
//First file
- data.clear();
data.add("0");
data.add("1");
data.add("1");
@@ -462,7 +454,7 @@ public class TestTezMerger {
@Test(timeout = 5000)
public void testWithCustomComparator() throws Exception {
- List<Path> pathList = new LinkedList<Path>();
+ List<Path> pathList = new LinkedList<>();
List<String> data = Lists.newLinkedList();
LOG.info(
@@ -470,7 +462,6 @@ public class TestTezMerger {
//Test with 3 files
//First file
- data.clear();
data.add("0");
pathList.add(createIFileWithTextData(data));
@@ -500,14 +491,13 @@ public class TestTezMerger {
@Test(timeout = 5000)
public void testWithCustomComparator_RLE3() throws Exception {
- List<Path> pathList = new LinkedList<Path>();
+ List<Path> pathList = new LinkedList<>();
List<String> data = Lists.newLinkedList();
LOG.info("Test with custom comparator");
//Test with 3 files, same keys in middle of file
//First file
- data.clear();
data.add("0");
pathList.add(createIFileWithTextData(data));
@@ -535,7 +525,7 @@ public class TestTezMerger {
@Test(timeout = 5000)
public void testWithCustomComparator_allEmptyFiles() throws Exception {
- List<Path> pathList = new LinkedList<Path>();
+ List<Path> pathList = new LinkedList<>();
List<String> data = Lists.newLinkedList();
LOG.info("Test with custom comparator where all files are empty");
@@ -561,16 +551,10 @@ public class TestTezMerger {
/**
* Merge the data sets
- *
- * @param pathList
- * @param rc
- * @return
- * @throws IOException
*/
private TezRawKeyValueIterator merge(List<Path> pathList, RawComparator rc)
throws IOException, InterruptedException {
- TezMerger merger = new TezMerger();
- TezRawKeyValueIterator records = merger.merge(defaultConf, localFs,
+ TezRawKeyValueIterator records = TezMerger.merge(DEFAULT_CONF, localFs,
new SerializationContext(IntWritable.class, LongWritable.class, new WritableSerialization(),
new WritableSerialization()),
null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]), true, 4,
@@ -604,8 +588,7 @@ public class TestTezMerger {
private void merge(List<Path> pathList, int mergeFactor, RawComparator rc) throws Exception {
//Merge datasets
- TezMerger merger = new TezMerger();
- TezRawKeyValueIterator records = merger.merge(defaultConf, localFs,
+ TezRawKeyValueIterator records = TezMerger.merge(DEFAULT_CONF, localFs,
new SerializationContext(IntWritable.class, LongWritable.class, new WritableSerialization(),
new WritableSerialization()),
null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]), true, mergeFactor,
@@ -613,7 +596,7 @@ public class TestTezMerger {
new Reporter(), null, null, null, new Progress());
verifyData(records);
- verificationDataSet.clear();
+ VERIFICATION_DATA_SET.clear();
}
private void verifyData(TezRawKeyValueIterator records) throws IOException {
@@ -632,9 +615,9 @@ public class TestTezMerger {
if (records.isSameKey()) {
LOG.info("\tSame Key : key=" + k.get() + ", val=" + v.get());
//More than one key should be present in the source data
- assertTrue(verificationDataSet.get(k.get()).size() > 1);
+ assertTrue(VERIFICATION_DATA_SET.get(k.get()).size() > 1);
//Ensure this is same as the previous key we saw
- assertTrue("previousKey=" + pk + ", current=" + k.get(), pk == k.get());
+ assertEquals("previousKey=" + pk + ", current=" + k.get(), pk, k.get());
} else {
LOG.info("key=" + k.get() + ", val=" + v.get());
}
@@ -645,21 +628,20 @@ public class TestTezMerger {
}
//Verify if the number of distinct entries is the same in source and the test
- assertTrue("dataMap=" + dataMap.keySet().size() + ", verificationSet=" +
- verificationDataSet.keySet().size(),
- dataMap.keySet().size() == verificationDataSet.keySet().size());
+ assertEquals("dataMap=" + dataMap.keySet().size() + ", verificationSet=" +
+ VERIFICATION_DATA_SET.keySet().size(), dataMap.keySet().size(), VERIFICATION_DATA_SET.keySet().size());
//Verify with source data
- for (Integer key : verificationDataSet.keySet()) {
- assertTrue("Data size for " + key + " not matching with source; dataSize:" + dataMap
- .get(key).intValue() + ", source:" + verificationDataSet.get(key).size(),
- dataMap.get(key).intValue() == verificationDataSet.get(key).size());
+ for (Integer key : VERIFICATION_DATA_SET.keySet()) {
+ assertEquals("Data size for " + key + " not matching with source; dataSize:" + dataMap
+ .get(key) + ", source:" + VERIFICATION_DATA_SET.get(key).size(),
+ (int) dataMap.get(key), VERIFICATION_DATA_SET.get(key).size());
}
//Verify if every key has the same number of repeated items in the source dataset as well
for (Map.Entry<Integer, Integer> entry : dataMap.entrySet()) {
- assertTrue(entry.getKey() + "", verificationDataSet.get(entry.getKey()).size() == entry
- .getValue());
+ assertEquals(entry.getKey() + "", VERIFICATION_DATA_SET.get(entry.getKey()).size(), (int) entry
+ .getValue());
}
LOG.info("******************");
@@ -683,17 +665,17 @@ public class TestTezMerger {
segments.addAll(createInMemorySegments(10, 100));
segments.addAll(createDiskSegments(10, 100));
mergeSegments(segments, 5, true);
- verificationDataSet.clear();
+ VERIFICATION_DATA_SET.clear();
segments.clear();
segments.addAll(createDiskSegments(10, 100));
mergeSegments(segments, 5, true);
- verificationDataSet.clear();
+ VERIFICATION_DATA_SET.clear();
segments.clear();
segments.addAll(createInMemorySegments(3, 100));
mergeSegments(segments, 5, false);
- verificationDataSet.clear();
+ VERIFICATION_DATA_SET.clear();
segments.clear();
}
@@ -701,7 +683,7 @@ public class TestTezMerger {
private void mergeSegments(List<TezMerger.Segment> segmentList, int mergeFactor, boolean
hasDiskSegments) throws Exception {
//Merge datasets
- TezMerger.MergeQueue mergeQueue = new TezMerger.MergeQueue(defaultConf, localFs, segmentList,
+ TezMerger.MergeQueue mergeQueue = new TezMerger.MergeQueue(DEFAULT_CONF, localFs, segmentList,
comparator, new Reporter(), false, false);
TezRawKeyValueIterator records = mergeQueue.merge(
@@ -716,7 +698,7 @@ public class TestTezMerger {
int diskBufLen = mergeQueue.diskIFileValue.getLength();
assertTrue(diskBufLen + " disk buf length should be > 0", (hasDiskSegments == diskBufLen > 0));
- verificationDataSet.clear();
+ VERIFICATION_DATA_SET.clear();
}
private List<TezMerger.Segment> createInMemorySegments(int segmentCount, int keysPerSegment)
@@ -750,7 +732,7 @@ public class TestTezMerger {
longVal.write(v);
key.reset(k.getData(), 0, k.getLength());
value.reset(v.getData(), 0, v.getLength());
- verificationDataSet.put(intKey.get(), longVal.get());
+ VERIFICATION_DATA_SET.put(intKey.get(), longVal.get());
}
private List<TezMerger.Segment> createDiskSegments(int segmentCount, int keysPerSegment) throws
@@ -780,7 +762,7 @@ public class TestTezMerger {
for (Integer key : dataSet.keySet()) {
for (Long value : dataSet.get(key)) {
writer.append(new IntWritable(key), new LongWritable(value));
- verificationDataSet.put(key, value);
+ VERIFICATION_DATA_SET.put(key, value);
}
}
writer.close();
@@ -793,7 +775,6 @@ public class TestTezMerger {
*
* @param keyCount approximate number of keys to be created
* @param repeatCount number of times a key should be repeated
- * @return
*/
static TreeMultimap<Integer, Long> createDataForIFile(int keyCount, int repeatCount) {
TreeMultimap<Integer, Long> dataSet = TreeMultimap.create();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
index 573b3e9a1..73a55d942 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/OutputTestHelpers.java
@@ -35,7 +35,10 @@ import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-class OutputTestHelpers {
+final class OutputTestHelpers {
+
+ private OutputTestHelpers() {}
+
static OutputContext createOutputContext() throws IOException {
OutputContext outputContext = mock(OutputContext.class);
Configuration conf = new TezConfiguration();
@@ -47,7 +50,7 @@ class OutputTestHelpers {
doReturn("destinationVertex").when(outputContext).getDestinationVertexName();
doReturn(payLoad).when(outputContext).getUserPayload();
doReturn(workingDirs).when(outputContext).getWorkDirs();
- doReturn(200 * 1024 * 1024l).when(outputContext).getTotalMemoryAvailableToTask();
+ doReturn(200 * 1024 * 1024L).when(outputContext).getTotalMemoryAvailableToTask();
doReturn(counters).when(outputContext).getCounters();
doReturn(statsReporter).when(outputContext).getStatisticsReporter();
doReturn(new Configuration(false)).when(outputContext).getContainerConfiguration();
@@ -58,7 +61,7 @@ class OutputTestHelpers {
throws IOException {
OutputContext ctx = mock(OutputContext.class);
doAnswer(new Answer<Void>() {
- @Override public Void answer(InvocationOnMock invocation) throws Throwable {
+ @Override public Void answer(InvocationOnMock invocation) {
long requestedSize = (Long) invocation.getArguments()[0];
MemoryUpdateCallbackHandler callback = (MemoryUpdateCallbackHandler) invocation
.getArguments()[1];
@@ -72,7 +75,7 @@ class OutputTestHelpers {
doReturn("destinationVertex").when(ctx).getDestinationVertexName();
doReturn("UUID").when(ctx).getUniqueIdentifier();
doReturn(new String[] { workingDir.toString() }).when(ctx).getWorkDirs();
- doReturn(200 * 1024 * 1024l).when(ctx).getTotalMemoryAvailableToTask();
+ doReturn(200 * 1024 * 1024L).when(ctx).getTotalMemoryAvailableToTask();
doReturn(new TezCounters()).when(ctx).getCounters();
OutputStatisticsReporter statsReporter = mock(OutputStatisticsReporter.class);
doReturn(statsReporter).when(ctx).getStatisticsReporter();
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
index 960d37107..62fd2bf17 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/KVDataGen.java
@@ -25,10 +25,12 @@ import java.util.Random;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
-public class KVDataGen {
+public final class KVDataGen {
static Random rnd = new Random();
+ private KVDataGen() {}
+
public static List<KVPair> generateTestData(boolean repeatKeys) {
return generateTestData(true, rnd.nextInt(100));
}
@@ -38,7 +40,6 @@ public class KVDataGen {
*
* @param sorted whether data should be sorted by key
* @param repeatCount number of keys to be repeated
- * @return
*/
public static List<KVPair> generateTestData(boolean sorted, int repeatCount) {
return generateTestDataOfKeySize(sorted, 5, repeatCount);
@@ -50,10 +51,9 @@ public class KVDataGen {
* @param sorted whether data should be sorted by key
* @param keys number of keys
* @param repeatCount number of keys to be repeated
- * @return
*/
public static List<KVPair> generateTestDataOfKeySize(boolean sorted, int keys, int repeatCount) {
- List<KVPair> data = new LinkedList<KVPair>();
+ List<KVPair> data = new LinkedList<>();
Random rnd = new Random();
KVPair kvp = null;
for (int i = 0; i < keys; i++) {
@@ -81,8 +81,8 @@ public class KVDataGen {
}
public static class KVPair {
- private Text key;
- private IntWritable value;
+ private final Text key;
+ private final IntWritable value;
public KVPair(Text key, IntWritable value) {
this.key = key;
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/RandomTextGenerator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/RandomTextGenerator.java
index c1a05d993..d6d15720a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/RandomTextGenerator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/testutils/RandomTextGenerator.java
@@ -22,12 +22,14 @@ import org.apache.hadoop.io.Text;
import java.util.Random;
-public class RandomTextGenerator {
+public final class RandomTextGenerator {
static int minWordsInKey = 10;
static int wordsInKeyRange = 100;
static final Random random = new Random();
+ private RandomTextGenerator() {}
+
public static Text generateSentence() {
int noWordsKey = minWordsInKey +
(wordsInKeyRange != 0 ? random.nextInt(wordsInKeyRange) : 0);
@@ -35,10 +37,10 @@ public class RandomTextGenerator {
}
public static Text generateSentence(int noWords) {
- StringBuffer sentence = new StringBuffer();
+ StringBuilder sentence = new StringBuilder();
String space = " ";
for (int i = 0; i < noWords; ++i) {
- sentence.append(words[random.nextInt(words.length)]);
+ sentence.append(WORDS[random.nextInt(WORDS.length)]);
sentence.append(space);
}
return new Text(sentence.toString());
@@ -49,7 +51,7 @@ public class RandomTextGenerator {
* <p/>
* A random list of 100 words from /usr/share/dict/words
*/
- private static String[] words = {
+ private static final String[] WORDS = {
"diurnalness", "Homoiousian",
"spiranthic", "tetragynian",
"silverhead", "ungreat",
diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
index cdbdf13db..cc22f6a4f 100644
--- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
+++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/ExampleDriver.java
@@ -38,11 +38,13 @@ import org.apache.tez.mapreduce.hadoop.MRConfig;
* A description of an example program based on its class and a
* human-readable description.
*/
-public class ExampleDriver {
+public final class ExampleDriver {
private static final DecimalFormat formatter = new DecimalFormat("###.##%");
- public static void main(String argv[]){
+ private ExampleDriver() {}
+
+ public static void main(String[] argv){
int exitCode = -1;
ProgramDriver pgd = new ProgramDriver();
try {
@@ -110,9 +112,9 @@ public class ExampleDriver {
DAGStatus dagStatus = dagClient.getDAGStatus(
(displayDAGCounters ? opts : null));
Progress progress = dagStatus.getDAGProgress();
- double vProgressFloat = 0.0f;
+ double vProgressFloat;
if (progress != null) {
- System.out.println("");
+ System.out.println();
System.out.println("DAG: State: "
+ dagStatus.getState()
+ " Progress: "
diff --git a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
index 90a8cf61a..36fad117e 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG.java
@@ -38,11 +38,13 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
* v2
*
*/
-public class SimpleTestDAG {
+public final class SimpleTestDAG {
static Resource defaultResource = Resource.newInstance(100, 0);
public static String TEZ_SIMPLE_DAG_NUM_TASKS =
"tez.simple-test-dag.num-tasks";
public static int TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT = 2;
+
+ private SimpleTestDAG() {}
public static DAG createDAG(String name,
Configuration conf) throws Exception {
@@ -76,10 +78,6 @@ public class SimpleTestDAG {
* v4 v5
* \ /
* v6
- * @param name
- * @param conf
- * @return
- * @throws Exception
*/
public static DAG createDAGForVertexOrder(String name, Configuration conf) throws Exception{
UserPayload payload = UserPayload.create(null);
diff --git a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
index a48b2d696..5873b6697 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/SimpleTestDAG3Vertices.java
@@ -40,11 +40,13 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
* v3
*
*/
-public class SimpleTestDAG3Vertices {
+public final class SimpleTestDAG3Vertices {
static Resource defaultResource = Resource.newInstance(100, 0);
public static String TEZ_SIMPLE_DAG_NUM_TASKS =
"tez.simple-test-dag-3-vertices.num-tasks";
public static int TEZ_SIMPLE_DAG_NUM_TASKS_DEFAULT = 2;
+
+ private SimpleTestDAG3Vertices() {}
public static DAG createDAG(String name,
Configuration conf) throws Exception {
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestDriver.java b/tez-tests/src/test/java/org/apache/tez/test/TestDriver.java
index bf04fd536..6da14dfee 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestDriver.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestDriver.java
@@ -20,9 +20,11 @@ package org.apache.tez.test;
import org.apache.hadoop.util.ProgramDriver;
-public class TestDriver {
+public final class TestDriver {
- public static void main(String argv[]){
+ private TestDriver() {}
+
+ public static void main(String[] argv){
int exitCode = -1;
ProgramDriver pgd = new ProgramDriver();
try {
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
index e2fc53f69..b7ba9dd58 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestFaultTolerance.java
@@ -30,6 +30,7 @@ import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig;
import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager;
import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
+import org.apache.tez.test.dag.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,11 +52,6 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.test.dag.SimpleReverseVTestDAG;
-import org.apache.tez.test.dag.SimpleVTestDAG;
-import org.apache.tez.test.dag.SixLevelsFailingDAG;
-import org.apache.tez.test.dag.ThreeLevelsFailingDAG;
-import org.apache.tez.test.dag.TwoLevelsFailingDAG;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Ignore;
@@ -411,21 +407,24 @@ public class TestFaultTolerance {
@Test (timeout=60000)
public void testTwoLevelsFailingDAGSuccess() throws Exception {
Configuration testConf = new Configuration();
- DAG dag = TwoLevelsFailingDAG.createDAG("testTwoLevelsFailingDAGSuccess", testConf);
+ DAG dag = new FailingDagBuilder(FailingDagBuilder.Levels.TWO)
+ .withName("testTwoLevelsFailingDAGSuccess").withConf(testConf).build();
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
@Test (timeout=60000)
public void testThreeLevelsFailingDAGSuccess() throws Exception {
Configuration testConf = new Configuration();
- DAG dag = ThreeLevelsFailingDAG.createDAG("testThreeLevelsFailingDAGSuccess", testConf);
+ DAG dag = new FailingDagBuilder(FailingDagBuilder.Levels.THREE)
+ .withName("testThreeLevelsFailingDAGSuccess").withConf(testConf).build();
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
@Test (timeout=60000)
public void testSixLevelsFailingDAGSuccess() throws Exception {
Configuration testConf = new Configuration();
- DAG dag = SixLevelsFailingDAG.createDAG("testSixLevelsFailingDAGSuccess", testConf);
+ DAG dag = new FailingDagBuilder(FailingDagBuilder.Levels.SIX)
+ .withName("testSixLevelsFailingDAGSuccess").withConf(testConf).build();
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
@@ -462,8 +461,9 @@ public class TestFaultTolerance {
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_TASK_INDEX, "l3v1"), "0");
testConf.setInt(TestProcessor.getVertexConfName(
TestProcessor.TEZ_FAILING_PROCESSOR_VERIFY_VALUE, "l3v1", 0), 15);
-
- DAG dag = ThreeLevelsFailingDAG.createDAG("testThreeLevelsFailingDAG2VerticesHaveFailedAttemptsDAGSucceeds", testConf);
+
+ DAG dag = new FailingDagBuilder(FailingDagBuilder.Levels.THREE)
+ .withName("testThreeLevelsFailingDAG2VerticesHaveFailedAttemptsDAGSucceeds").withConf(testConf).build();
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
@@ -801,7 +801,8 @@ public class TestFaultTolerance {
Configuration testConf = new Configuration(false);
testConf.setBoolean(TestProcessor.TEZ_FAILING_PROCESSOR_DO_RANDOM_FAIL, true);
testConf.setFloat(TestProcessor.TEZ_FAILING_PROCESSOR_RANDOM_FAIL_PROBABILITY, 0.5f);
- DAG dag = SixLevelsFailingDAG.createDAG("testRandomFailingTasks", testConf);
+ DAG dag = new FailingDagBuilder(FailingDagBuilder.Levels.SIX)
+ .withName("testRandomFailingTasks").withConf(testConf).build();
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
@@ -811,7 +812,8 @@ public class TestFaultTolerance {
Configuration testConf = new Configuration(false);
testConf.setBoolean(TestInput.TEZ_FAILING_INPUT_DO_RANDOM_FAIL, true);
testConf.setFloat(TestInput.TEZ_FAILING_INPUT_RANDOM_FAIL_PROBABILITY, 0.5f);
- DAG dag = SixLevelsFailingDAG.createDAG("testRandomFailingInputs", testConf);
+ DAG dag = new FailingDagBuilder(FailingDagBuilder.Levels.SIX)
+ .withName("testRandomFailingInputs").withConf(testConf).build();
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/FailingDagBuilder.java b/tez-tests/src/test/java/org/apache/tez/test/dag/FailingDagBuilder.java
new file mode 100644
index 000000000..69f7ba811
--- /dev/null
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/FailingDagBuilder.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.test.dag;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.*;
+import org.apache.tez.test.TestInput;
+import org.apache.tez.test.TestOutput;
+import org.apache.tez.test.TestProcessor;
+
+import java.io.IOException;
+import java.util.function.BiConsumer;
+
+/**
+ * A builder for a DAG with vertices divided into a maximum of 6 levels.
+ * Vertex name is "l<level number>v<vertex number>". Level/vertex numbers start at 1.
+ * Each vertex has failing processor and failing inputs.
+ * The builder can accept Tez Configuration to indicate failing patterns.
+ * The number of levels in the built DAG can be configured.
+ * <p>
+ * DAG is shown with a diagram below.
+ * Each vertex has its degree of parallelism indicated in a bracket following its name.
+ * Each edge annotates with data movement (s = scatter/gather, b = broadcast)
+ * <p>
+ * l1v1(1) l1v2(2) l1v3(3) l1v4(2)
+ * |s |s |s |b
+ * | | | |
+ * l2v1(1) l2v2(3) l2v3(2) l2v4(3)
+ * \s /s \b |s /s
+ * \ / \ | /
+ * l3v1(4) l3v2(4)
+ * \s /s
+ * \ /
+ * l4v1 (10)
+ * /s |s \s
+ * / | \
+ * l5v1(2) l5v2(4) l5v3(1)
+ * \s |s /s
+ * \ | /
+ * l6v1(4)
+ *
+ */
+
+public class FailingDagBuilder {
+
+ private final static Resource DEFAULT_RESOURCE = org.apache.hadoop.yarn.api.records.Resource.newInstance(100, 0);
+
+ private final Levels levels;
+ private String name;
+ private Configuration conf;
+
+ public enum Levels {
+ TWO("TwoLevelsFailingDAG", (dag, payload) -> {
+ Vertex l1v1 = Vertex.create("l1v1", TestProcessor.getProcDesc(payload), 1, DEFAULT_RESOURCE);
+ Vertex l2v1 = Vertex.create("l2v1", TestProcessor.getProcDesc(payload), 1, DEFAULT_RESOURCE);
+ addVerticesAndEdgeInternal(dag, l1v1, l2v1, EdgeProperty.DataMovementType.SCATTER_GATHER, payload);
+ Vertex l1v2 = Vertex.create("l1v2", TestProcessor.getProcDesc(payload), 2, DEFAULT_RESOURCE);
+ Vertex l2v2 = Vertex.create("l2v2", TestProcessor.getProcDesc(payload), 3, DEFAULT_RESOURCE);
+ addVerticesAndEdgeInternal(dag, l1v2, l2v2, EdgeProperty.DataMovementType.SCATTER_GATHER, payload);
+ Vertex l1v3 = Vertex.create("l1v3", TestProcessor.getProcDesc(payload), 3, DEFAULT_RESOURCE);
+ Vertex l2v3 = Vertex.create("l2v3", TestProcessor.getProcDesc(payload), 2, DEFAULT_RESOURCE);
+ addVerticesAndEdgeInternal(dag, l1v3, l2v3, EdgeProperty.DataMovementType.SCATTER_GATHER, payload);
+ Vertex l1v4 = Vertex.create("l1v4", TestProcessor.getProcDesc(payload), 2, DEFAULT_RESOURCE);
+ Vertex l2v4 = Vertex.create("l2v4", TestProcessor.getProcDesc(payload), 3, DEFAULT_RESOURCE);
+ addVerticesAndEdgeInternal(dag, l1v4, l2v4, EdgeProperty.DataMovementType.BROADCAST, payload);
+ }),
+ THREE("ThreeLevelsFailingDAG", (dag, payload) -> {
+ TWO.levelAdder.accept(dag, payload);
+ Vertex l3v1 = Vertex.create("l3v1", TestProcessor.getProcDesc(payload), 4, DEFAULT_RESOURCE);
+ dag.addVertex(l3v1);
+ addEdge(dag, dag.getVertex("l2v1"), l3v1, EdgeProperty.DataMovementType.SCATTER_GATHER, payload);
+ addEdge(dag, dag.getVertex("l2v2"), l3v1, EdgeProperty.DataMovementType.SCATTER_GATHER, payload);
+ Vertex l3v2 = Vertex.create("l3v2", TestProcessor.getProcDesc(payload), 4, DEFAULT_RESOURCE);
+ dag.addVertex(l3v2);
+ addEdge(dag, dag.getVertex("l2v2"), l3v2, EdgeProperty.DataMovementType.BROADCAST, payload);
+ addEdge(dag, dag.getVertex("l2v3"), l3v2, EdgeProperty.DataMovementType.SCATTER_GATHER, payload);
+ addEdge(dag, dag.getVertex("l2v4"), l3v2, EdgeProperty.DataMovementType.SCATTER_GATHER, payload);
+ }),
+ SIX("SixLevelsFailingDAG", (dag, payload) -> {
+ THREE.levelAdder.accept(dag, payload);
+ Vertex l4v1 = Vertex.create("l4v1", TestProcessor.getProcDesc(payload), 10, DEFAULT_RESOURCE);
+ dag.addVertex(l4v1);
+ addEdge(dag, dag.getVertex("l3v1"), l4v1, EdgeProperty.DataMovementType.SCATTER_GATHER, payload);
+ addEdge(dag, dag.getVertex("l3v2"), l4v1, EdgeProperty.DataMovementType.SCATTER_GATHER, payload);
+ Vertex l5v1 = Vertex.create("l5v1", TestProcessor.getProcDesc(payload), 2, DEFAULT_RESOURCE);
+ dag.addVertex(l5v1);
+ addEdge(dag, l4v1, l5v1, EdgeProperty.DataMovementType.SCATTER_GATHER, payload);
+ Vertex l5v2 = Vertex.create("l5v2", TestProcessor.getProcDesc(payload), 4, DEFAULT_RESOURCE);
+ dag.addVertex(l5v2);
+ addEdge(dag, l4v1, l5v2, EdgeProperty.DataMovementType.SCATTER_GATHER, payload);
+ Vertex l5v3 = Vertex.create("l5v3", TestProcessor.getProcDesc(payload), 1, DEFAULT_RESOURCE);
+ dag.addVertex(l5v3);
+ addEdge(dag, l4v1, l5v3, EdgeProperty.DataMovementType.SCATTER_GATHER, payload);
+ Vertex l6v1 = Vertex.create("l6v1", TestProcessor.getProcDesc(payload), 4, DEFAULT_RESOURCE);
+ dag.addVertex(l6v1);
+ addEdge(dag, l5v1, l6v1, EdgeProperty.DataMovementType.SCATTER_GATHER, payload);
+ addEdge(dag, l5v2, l6v1, EdgeProperty.DataMovementType.SCATTER_GATHER, payload);
+ addEdge(dag, l5v3, l6v1, EdgeProperty.DataMovementType.SCATTER_GATHER, payload);
+ });
+
+ private final String defaultName;
+ private final BiConsumer<DAG, UserPayload> levelAdder;
+ Levels(String defaultName, BiConsumer<DAG, UserPayload> levelAdder) {
+ this.defaultName = defaultName;
+ this.levelAdder = levelAdder;
+ }
+
+ private static void addVerticesAndEdgeInternal(
+ DAG dag, Vertex v1, Vertex v2, EdgeProperty.DataMovementType dataMovementType, UserPayload payload) {
+ dag.addVertex(v1).addVertex(v2);
+ addEdge(dag, v1, v2, dataMovementType, payload);
+ }
+
+ private static void addEdge(
+ DAG dag, Vertex v1, Vertex v2, EdgeProperty.DataMovementType dataMovementType, UserPayload payload) {
+ dag.addEdge(Edge.create(v1, v2,
+ EdgeProperty.create(dataMovementType,
+ EdgeProperty.DataSourceType.PERSISTED,
+ EdgeProperty.SchedulingType.SEQUENTIAL,
+ TestOutput.getOutputDesc(payload),
+ TestInput.getInputDesc(payload))));
+ }
+ }
+
+ public FailingDagBuilder(Levels levels) {
+ this.levels = levels;
+ this.name = levels.defaultName;
+ }
+
+ public FailingDagBuilder withConf(Configuration config) {
+ conf = config;
+ return this;
+ }
+
+ public FailingDagBuilder withName(String dagName) {
+ name = dagName;
+ return this;
+ }
+
+ public DAG build() throws IOException {
+ UserPayload payload = UserPayload.create(null);
+ if (conf != null) {
+ payload = TezUtils.createUserPayloadFromConf(conf);
+ }
+
+ DAG dag = DAG.create(name);
+
+ levels.levelAdder.accept(dag, payload);
+
+ return dag;
+ }
+}
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
index cdf69e69b..f2d02727b 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java
@@ -66,7 +66,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
-public class MultiAttemptDAG {
+public final class MultiAttemptDAG {
private static final Logger LOG =
LoggerFactory.getLogger(MultiAttemptDAG.class);
@@ -80,9 +80,11 @@ public class MultiAttemptDAG {
"tez.multi-attempt-dag.use-failing-committer";
public static boolean MULTI_ATTEMPT_DAG_USE_FAILING_COMMITTER_DEFAULT = false;
+ private MultiAttemptDAG() {}
+
public static class FailOnAttemptVertexManagerPlugin extends VertexManagerPlugin {
private int numSourceTasks = 0;
- private AtomicInteger numCompletions = new AtomicInteger();
+ private final AtomicInteger numCompletions = new AtomicInteger();
private boolean tasksScheduled = false;
public FailOnAttemptVertexManagerPlugin(VertexManagerPluginContext context) {
@@ -114,7 +116,7 @@ public class MultiAttemptDAG {
&& !tasksScheduled) {
tasksScheduled = true;
String payload = new String(getContext().getUserPayload().deepCopyAsArray());
- int successAttemptId = Integer.valueOf(payload);
+ int successAttemptId = Integer.parseInt(payload);
LOG.info("Checking whether to crash AM or schedule tasks"
+ ", vertex: " + getContext().getVertexName()
+ ", successfulAttemptID=" + successAttemptId
@@ -150,7 +152,7 @@ public class MultiAttemptDAG {
@Override
public void onRootVertexInitialized(String inputName,
InputDescriptor inputDescriptor, List<Event> events) {
- List<InputDataInformationEvent> inputInfoEvents = new ArrayList<InputDataInformationEvent>();
+ List<InputDataInformationEvent> inputInfoEvents = new ArrayList<>();
for (Event event: events) {
if (event instanceof InputDataInformationEvent) {
inputInfoEvents.add((InputDataInformationEvent)event);
@@ -178,12 +180,12 @@ public class MultiAttemptDAG {
}
@Override
- public void setupOutput() throws Exception {
+ public void setupOutput() {
}
@Override
- public void commitOutput() throws Exception {
+ public void commitOutput() {
if (failOnCommit) {
LOG.info("Committer causing AM to shutdown");
Runtime.getRuntime().halt(-1);
@@ -191,7 +193,7 @@ public class MultiAttemptDAG {
}
@Override
- public void abortOutput(State finalState) throws Exception {
+ public void abortOutput(State finalState) {
}
@@ -212,11 +214,7 @@ public class MultiAttemptDAG {
public void fromUserPayload(byte[] userPayload) {
int failInt = Ints.fromByteArray(userPayload);
- if (failInt == 0) {
- failOnCommit = false;
- } else {
- failOnCommit = true;
- }
+ failOnCommit = failInt != 0;
}
}
}
@@ -229,14 +227,13 @@ public class MultiAttemptDAG {
@Override
public List<Event> initialize() throws Exception {
- List<Event> events = new ArrayList<Event>();
+ List<Event> events = new ArrayList<>();
events.add(InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.allocate(0)));
return events;
}
@Override
- public void handleInputInitializerEvent(List<InputInitializerEvent> events)
- throws Exception {
+ public void handleInputInitializerEvent(List<InputInitializerEvent> events) {
throw new UnsupportedOperationException("Not supported");
}
}
@@ -250,7 +247,7 @@ public class MultiAttemptDAG {
@Override
public List<Event> initialize() throws Exception {
try {
- Thread.sleep(2000l);
+ Thread.sleep(2000L);
} catch (InterruptedException e) {
// Ignore
}
@@ -262,8 +259,7 @@ public class MultiAttemptDAG {
}
@Override
- public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws
- Exception {
+ public void handleInputInitializerEvent(List<InputInitializerEvent> events) {
throw new UnsupportedOperationException("Not supported");
}
}
@@ -276,7 +272,7 @@ public class MultiAttemptDAG {
@Override
public List<Event> initialize() throws Exception {
- getContext().requestInitialMemory(1l, new MemoryUpdateCallback() {
+ getContext().requestInitialMemory(1L, new MemoryUpdateCallback() {
@Override
public void memoryAssigned(long assignedSize) {}
});
@@ -289,12 +285,12 @@ public class MultiAttemptDAG {
}
@Override
- public Reader getReader() throws Exception {
+ public Reader getReader() {
return null;
}
@Override
- public void handleEvents(List<Event> inputEvents) throws Exception {
+ public void handleEvents(List<Event> inputEvents) {
}
@@ -313,7 +309,7 @@ public class MultiAttemptDAG {
@Override
public List<Event> initialize() throws Exception {
- getContext().requestInitialMemory(1l, new MemoryUpdateCallback() {
+ getContext().requestInitialMemory(1L, new MemoryUpdateCallback() {
@Override
public void memoryAssigned(long assignedSize) {
}
@@ -327,7 +323,7 @@ public class MultiAttemptDAG {
}
@Override
- public Writer getWriter() throws Exception {
+ public Writer getWriter() {
return null;
}
@@ -361,13 +357,13 @@ public class MultiAttemptDAG {
// Make each vertex manager fail on appropriate attempt
v1.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
FailOnAttemptVertexManagerPlugin.class.getName())
- .setUserPayload(UserPayload.create(ByteBuffer.wrap(new String("1").getBytes()))));
+ .setUserPayload(UserPayload.create(ByteBuffer.wrap("1".getBytes()))));
v2.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
FailOnAttemptVertexManagerPlugin.class.getName())
- .setUserPayload(UserPayload.create(ByteBuffer.wrap(new String("2").getBytes()))));
+ .setUserPayload(UserPayload.create(ByteBuffer.wrap("2".getBytes()))));
v3.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
FailOnAttemptVertexManagerPlugin.class.getName())
- .setUserPayload(UserPayload.create(ByteBuffer.wrap(new String("3").getBytes()))));
+ .setUserPayload(UserPayload.create(ByteBuffer.wrap("3".getBytes()))));
dag.addVertex(v1).addVertex(v2).addVertex(v3);
dag.addEdge(Edge.create(v1, v2,
EdgeProperty.create(DataMovementType.SCATTER_GATHER,
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
index 4c8771cdb..29d4d0b97 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleReverseVTestDAG.java
@@ -40,11 +40,13 @@ import org.apache.tez.test.TestProcessor;
* v2 v3
*
*/
-public class SimpleReverseVTestDAG {
+public final class SimpleReverseVTestDAG {
static Resource defaultResource = Resource.newInstance(100, 0);
public static String TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS =
"tez.simple-reverse-v-test-dag.num-tasks";
public static int TEZ_SIMPLE_REVERSE_V_DAG_NUM_TASKS_DEFAULT = 2;
+
+ private SimpleReverseVTestDAG() {}
public static DAG createDAG(String name,
Configuration conf) throws Exception {
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
index a4eb95e05..c6e815fa5 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/dag/SimpleVTestDAG.java
@@ -40,11 +40,13 @@ import org.apache.tez.test.TestProcessor;
* v3
*
*/
-public class SimpleVTestDAG {
+public final class SimpleVTestDAG {
static Resource defaultResource = Resource.newInstance(100, 0);
public static String TEZ_SIMPLE_V_DAG_NUM_TASKS =
"tez.simple-v-test-dag.num-tasks";
public static int TEZ_SIMPLE_V_DAG_NUM_TASKS_DEFAULT = 2;
+
+ private SimpleVTestDAG() {}
public static DAG createDAG(String name,
Configuration conf) throws Exception {
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
deleted file mode 100644
index 036bedfdb..000000000
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/SixLevelsFailingDAG.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.test.dag;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.test.TestProcessor;
-
-/**
- * A DAG with vertices divided into 6 levels.
- * Vertex name is "l<level number>v<vertex number>". Level/vertex numbers start at 1.
- * Each vertex has failing processor and failing inputs. The constructor can accept Tez Configuration to indicate failing patterns.
- *
- * DAG is shown with a diagram below.
- * Each vertex has its degree of parallelism indicated in a bracket following its name.
- * Each edge annotates with data movement (s = scatter/gather, b = broadcast)
- *
- * l1v1(1) l1v2(2) l1v3(3) l1v4(2)
- * |s |s |s |b
- * | | | |
- * l2v1(1) l2v2(3) l2v3(2) l2v4(3)
- * \s /s \b |s /s
- * \ / \ | /
- * l3v1(4) l3v2(4)
- * \s /s
- * \ /
- * l4v1 (10)
- * /s |s \s
- * / | \
- * l5v1(2) l5v2(4) l5v3(1)
- * \s |s /s
- * \ | /
- * l6v1(4)
- *
- */
-public class SixLevelsFailingDAG extends ThreeLevelsFailingDAG {
-
- protected static Vertex l4v1;
- protected static Vertex l5v1, l5v2, l5v3;
- protected static Vertex l6v1;
-
- protected static void addDAGVerticesAndEdges() {
- ThreeLevelsFailingDAG.addDAGVerticesAndEdges();
- l4v1 = Vertex.create("l4v1", TestProcessor.getProcDesc(payload), 10, defaultResource);
- dag.addVertex(l4v1);
- addEdge(l3v1, l4v1, DataMovementType.SCATTER_GATHER);
- addEdge(l3v2, l4v1, DataMovementType.SCATTER_GATHER);
- l5v1 = Vertex.create("l5v1", TestProcessor.getProcDesc(payload), 2, defaultResource);
- dag.addVertex(l5v1);
- addEdge(l4v1, l5v1, DataMovementType.SCATTER_GATHER);
- l5v2 = Vertex.create("l5v2", TestProcessor.getProcDesc(payload), 4, defaultResource);
- dag.addVertex(l5v2);
- addEdge(l4v1, l5v2, DataMovementType.SCATTER_GATHER);
- l5v3 = Vertex.create("l5v3", TestProcessor.getProcDesc(payload), 1, defaultResource);
- dag.addVertex(l5v3);
- addEdge(l4v1, l5v3, DataMovementType.SCATTER_GATHER);
- l6v1 = Vertex.create("l6v1", TestProcessor.getProcDesc(payload), 4, defaultResource);
- dag.addVertex(l6v1);
- addEdge(l5v1, l6v1, DataMovementType.SCATTER_GATHER);
- addEdge(l5v2, l6v1, DataMovementType.SCATTER_GATHER);
- addEdge(l5v3, l6v1, DataMovementType.SCATTER_GATHER);
- }
-
- public static DAG createDAG(String name,
- Configuration conf) throws Exception {
- if (conf != null) {
- payload = TezUtils.createUserPayloadFromConf(conf);
- }
- dag = DAG.create(name);
- addDAGVerticesAndEdges();
- return dag;
- }
-
- public static DAG createDAG(Configuration conf) throws Exception {
- return createDAG("SixLevelsFailingDAG", conf);
- }
-}
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
deleted file mode 100644
index 7f2e4f8ec..000000000
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/ThreeLevelsFailingDAG.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.test.dag;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.test.TestProcessor;
-
-/**
- * A DAG with vertices divided into 3 levels.
- * Vertex name is "l<level number>v<vertex number>". Level/vertex numbers start at 1.
- * Each vertex has failing processor and failing inputs. The constructor can accept Tez Configuration to indicate failing patterns.
- *
- * DAG is shown with a diagram below.
- * Each vertex has its degree of parallelism indicated in a bracket following its name.
- * Each edge annotates with data movement (s = scatter/gather, b = broadcast)
- *
- * l1v1(1) l1v2(2) l1v3(3) l1v4(2)
- * |s |s |s |b
- * | | | |
- * l2v1(1) l2v2(3) l2v3(2) l2v4(3)
- * \s /s \b |s /s
- * \ / \ | /
- * l3v1(4) l3v2(4)
- *
- */
-public class ThreeLevelsFailingDAG extends TwoLevelsFailingDAG {
-
- protected static Vertex l3v1, l3v2;
-
- protected static void addDAGVerticesAndEdges() {
- TwoLevelsFailingDAG.addDAGVerticesAndEdges();
- l3v1 = Vertex.create("l3v1", TestProcessor.getProcDesc(payload), 4, defaultResource);
- dag.addVertex(l3v1);
- addEdge(l2v1, l3v1, DataMovementType.SCATTER_GATHER);
- addEdge(l2v2, l3v1, DataMovementType.SCATTER_GATHER);
- l3v2 = Vertex.create("l3v2", TestProcessor.getProcDesc(payload), 4, defaultResource);
- dag.addVertex(l3v2);
- addEdge(l2v2, l3v2, DataMovementType.BROADCAST);
- addEdge(l2v3, l3v2, DataMovementType.SCATTER_GATHER);
- addEdge(l2v4, l3v2, DataMovementType.SCATTER_GATHER);
- }
-
- public static DAG createDAG(String name,
- Configuration conf) throws Exception {
- if (conf != null) {
- payload = TezUtils.createUserPayloadFromConf(conf);
- }
- dag = DAG.create(name);
- addDAGVerticesAndEdges();
- return dag;
- }
-
- public static DAG createDAG(Configuration conf) throws Exception {
- return createDAG("ThreeLevelsFailingDAG", conf);
- }
-}
diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
deleted file mode 100644
index 151e3855e..000000000
--- a/tez-tests/src/test/java/org/apache/tez/test/dag/TwoLevelsFailingDAG.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.test.dag;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.UserPayload;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
-import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
-import org.apache.tez.test.TestInput;
-import org.apache.tez.test.TestOutput;
-import org.apache.tez.test.TestProcessor;
-
-/**
- * A DAG with vertices divided into 2 levels.
- * Vertex name is "l<level number>v<vertex number>". Level/vertex numbers start at 1.
- * Each vertex has failing processor and failing inputs. The constructor can accept Tez Configuration to indicate failing patterns.
- *
- * DAG is shown with a diagram below.
- * Each vertex has its degree of parallelism indicated in a bracket following its name.
- * Each edge annotates with data movement (s = scatter/gather, b = broadcast)
- *
- * l1v1(1) l1v2(2) l1v3(3) l1v4(2)
- * |s |s |s |b
- * | | | |
- * l2v1(1) l2v2(3) l2v3(2) l2v4(3)
- *
- */
-public class TwoLevelsFailingDAG {
- static Resource defaultResource = Resource.newInstance(100, 0);
- protected static DAG dag;
- protected static UserPayload payload = UserPayload.create(null);
- protected static Vertex l1v1, l1v2, l1v3, l1v4;
- protected static Vertex l2v1, l2v2, l2v3, l2v4;
-
- public static DAG createDAG(String name,
- Configuration conf) throws Exception {
- if (conf != null) {
- payload = TezUtils.createUserPayloadFromConf(conf);
- }
- dag = DAG.create(name);
- addDAGVerticesAndEdges();
- return dag;
- }
-
- protected static void addDAGVerticesAndEdges() {
- l1v1 = Vertex.create("l1v1", TestProcessor.getProcDesc(payload), 1, defaultResource);
- l2v1 = Vertex.create("l2v1", TestProcessor.getProcDesc(payload), 1, defaultResource);
- addVerticesAndEdgeInternal(l1v1, l2v1, DataMovementType.SCATTER_GATHER);
- l1v2 = Vertex.create("l1v2", TestProcessor.getProcDesc(payload), 2, defaultResource);
- l2v2 = Vertex.create("l2v2", TestProcessor.getProcDesc(payload), 3, defaultResource);
- addVerticesAndEdgeInternal(l1v2, l2v2, DataMovementType.SCATTER_GATHER);
- l1v3 = Vertex.create("l1v3", TestProcessor.getProcDesc(payload), 3, defaultResource);
- l2v3 = Vertex.create("l2v3", TestProcessor.getProcDesc(payload), 2, defaultResource);
- addVerticesAndEdgeInternal(l1v3, l2v3, DataMovementType.SCATTER_GATHER);
- l1v4 = Vertex.create("l1v4", TestProcessor.getProcDesc(payload), 2, defaultResource);
- l2v4 = Vertex.create("l2v4", TestProcessor.getProcDesc(payload), 3, defaultResource);
- addVerticesAndEdgeInternal(l1v4, l2v4, DataMovementType.BROADCAST);
- }
-
- /**
- * Adds 2 vertices and an edge connecting them.
- * Given two vertices must not exist.
- *
- * @param v1 vertice 1
- * @param v2 vertice 2
- * @param dataMovementType Data movement type
- */
- protected static void addVerticesAndEdgeInternal(Vertex v1, Vertex v2, DataMovementType dataMovementType) {
- dag.addVertex(v1).addVertex(v2);
- addEdge(v1, v2, dataMovementType);
- }
-
- /**
- * Adds an edge to given 2 vertices.
- * @param v1 vertice 1
- * @param v2 vertice 2
- * @param dataMovementType Data movement type
- */
- protected static void addEdge(Vertex v1, Vertex v2, DataMovementType dataMovementType) {
- dag.addEdge(Edge.create(v1, v2,
- EdgeProperty.create(dataMovementType,
- DataSourceType.PERSISTED,
- SchedulingType.SEQUENTIAL,
- TestOutput.getOutputDesc(payload),
- TestInput.getInputDesc(payload))));
- }
-
- public static DAG createDAG(Configuration conf) throws Exception {
- return createDAG("TwoLevelsFailingDAG", conf);
- }
-}
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
index 294527cd3..9eda46294 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java
@@ -20,7 +20,9 @@ package org.apache.tez.analyzer.plugins;
import org.apache.hadoop.util.ProgramDriver;
-public class AnalyzerDriver {
+public final class AnalyzerDriver {
+
+ private AnalyzerDriver() {}
public static void main(String argv[]){
int exitCode = -1;
diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java
index 8bcf2650f..3b3e63946 100644
--- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java
+++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/utils/Utils.java
@@ -30,10 +30,12 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-public class Utils {
+public final class Utils {
private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+");
+ private Utils() {}
+
public static String getShortClassName(String className) {
int pos = className.lastIndexOf(".");
if (pos != -1 && pos < className.length() - 1) {
diff --git a/tez-tools/tez-javadoc-tools/src/main/java/org/apache/tez/tools/javadoc/doclet/ConfigStandardDoclet.java b/tez-tools/tez-javadoc-tools/src/main/java/org/apache/tez/tools/javadoc/doclet/ConfigStandardDoclet.java
index 6cc9ced86..0713467c7 100644
--- a/tez-tools/tez-javadoc-tools/src/main/java/org/apache/tez/tools/javadoc/doclet/ConfigStandardDoclet.java
+++ b/tez-tools/tez-javadoc-tools/src/main/java/org/apache/tez/tools/javadoc/doclet/ConfigStandardDoclet.java
@@ -19,7 +19,6 @@
package org.apache.tez.tools.javadoc.doclet;
import java.io.IOException;
-import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -41,11 +40,13 @@ import com.sun.javadoc.LanguageVersion;
import com.sun.javadoc.RootDoc;
import com.sun.tools.doclets.standard.Standard;
-public class ConfigStandardDoclet {
+public final class ConfigStandardDoclet {
private static final String DEBUG_SWITCH = "-debug";
private static boolean debugMode = false;
+ private ConfigStandardDoclet() {}
+
public static LanguageVersion languageVersion() {
return LanguageVersion.JAVA_1_5;
}
@@ -63,14 +64,15 @@ public class ConfigStandardDoclet {
for (String opt : opts) {
if (opt.equals(DEBUG_SWITCH)) {
debugMode = true;
+ break;
}
}
}
logMessage("Running doclet " + ConfigStandardDoclet.class.getSimpleName());
ClassDoc[] classes = root.classes();
- for (int i = 0; i < classes.length; ++i) {
- processDoc(classes[i]);
+ for (ClassDoc aClass : classes) {
+ processDoc(aClass);
}
return true;
@@ -184,11 +186,9 @@ public class ConfigStandardDoclet {
ConfigurationProperty.class.getCanonicalName())) {
configProperty.isValidConfigProp = true;
- boolean foundType = false;
for (ElementValuePair element : annotationDesc.elementValues()) {
if (element.element().name().equals("type")) {
configProperty.type = stripQuotes(element.value().toString());
- foundType = true;
} else {
logMessage("Unhandled annotation property: " + element.element().name());
}