You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dm...@apache.org on 2019/10/21 22:48:04 UTC
[hive] branch master updated: HIVE-21426: Remove Utilities Global
Random (David Mollitor, reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
dmollitor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 40cd40d HIVE-21426: Remove Utilities Global Random (David Mollitor, reviewed by Peter Vary)
40cd40d is described below
commit 40cd40d5e212860868d7dd15335c0085568416ce
Author: David Mollitor <dm...@apache.org>
AuthorDate: Mon Oct 21 18:46:42 2019 -0400
HIVE-21426: Remove Utilities Global Random (David Mollitor, reviewed by Peter Vary)
---
.../hadoop/hive/ql/exec/SparkHashTableSinkOperator.java | 3 ++-
ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java | 15 +++++++++------
.../org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java | 10 +++++++---
.../org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java | 4 +++-
.../hive/ql/io/rcfile/truncate/ColumnTruncateTask.java | 5 +++--
.../ql/parse/spark/SparkPartitionPruningSinkOperator.java | 5 +++--
6 files changed, 27 insertions(+), 15 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
index 78ae9a1..10144a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
@@ -22,6 +22,7 @@ import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.io.FileExistsException;
import org.apache.hadoop.conf.Configuration;
@@ -140,7 +141,7 @@ public class SparkHashTableSinkOperator
fs.mkdirs(path); // Create the folder and its parents if not there
while (true) {
path = new Path(path, getOperatorId()
- + "-" + Math.abs(Utilities.randGen.nextInt()));
+ + "-" + Math.abs(ThreadLocalRandom.current().nextInt()));
try {
// This will guarantee file name uniqueness.
if (fs.createNewFile(path)) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 7fd42c1..a7770b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -67,6 +67,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -257,8 +258,6 @@ public final class Utilities {
@Deprecated
protected static final String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = "mapred.dfsclient.parallelism.max";
- public static Random randGen = new Random();
-
private static final Object INPUT_SUMMARY_LOCK = new Object();
private static final Object ROOT_HDFS_DIR_LOCK = new Object();
@@ -751,7 +750,8 @@ public final class Utilities {
public static String getTaskId(Configuration hconf) {
String taskid = (hconf == null) ? null : hconf.get("mapred.task.id");
if (StringUtils.isEmpty(taskid)) {
- return (Integer.toString(randGen.nextInt(Integer.MAX_VALUE)));
+ return (Integer
+ .toString(ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)));
} else {
/*
* extract the task and attempt id from the hadoop taskid. in version 17 the leading component
@@ -2894,7 +2894,8 @@ public final class Utilities {
if (failures >= maxRetries) {
throw e;
}
- long waitTime = getRandomWaitTime(baseWindow, failures, randGen);
+ long waitTime = getRandomWaitTime(baseWindow, failures,
+ ThreadLocalRandom.current());
try {
Thread.sleep(waitTime);
} catch (InterruptedException iex) {
@@ -2933,7 +2934,8 @@ public final class Utilities {
LOG.error("Error during JDBC connection.", e);
throw e;
}
- long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, randGen);
+ long waitTime = Utilities.getRandomWaitTime(waitWindow, failures,
+ ThreadLocalRandom.current());
try {
Thread.sleep(waitTime);
} catch (InterruptedException e1) {
@@ -2972,7 +2974,8 @@ public final class Utilities {
LOG.error("Error preparing JDBC Statement {}", stmt, e);
throw e;
}
- long waitTime = Utilities.getRandomWaitTime(waitWindow, failures, randGen);
+ long waitTime = Utilities.getRandomWaitTime(waitWindow, failures,
+ ThreadLocalRandom.current());
try {
Thread.sleep(waitTime);
} catch (InterruptedException e1) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index ab1b52e..cd4f2a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -31,6 +31,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.AddToClassPathAction;
@@ -308,7 +309,8 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
if (noName) {
// This is for a special case to ensure unit tests pass
- job.set(MRJobConfig.JOB_NAME, "JOB" + Utilities.randGen.nextInt());
+ job.set(MRJobConfig.JOB_NAME,
+ "JOB" + ThreadLocalRandom.current().nextInt());
}
try{
@@ -807,8 +809,10 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
// working dirs and system dirs
// Workaround is to rename map red working dir to a temp dir in such cases
if (hadoopLocalMode) {
- tempConf.set(hadoopSysDir, hconf.get(hadoopSysDir) + "/" + Utilities.randGen.nextInt());
- tempConf.set(hadoopWorkDir, hconf.get(hadoopWorkDir) + "/" + Utilities.randGen.nextInt());
+ tempConf.set(hadoopSysDir, hconf.get(hadoopSysDir) + "/"
+ + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));
+ tempConf.set(hadoopWorkDir, hconf.get(hadoopWorkDir) + "/"
+ + ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));
}
try {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
index acc52af..7eb2ef3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig;
import java.io.IOException;
import java.io.Serializable;
+import java.util.concurrent.ThreadLocalRandom;
/**
* Task for fast merging of ORC and RC files.
@@ -124,7 +125,8 @@ public class MergeFileTask extends Task<MergeFileWork> implements Serializable,
if (noName) {
// This is for a special case to ensure unit tests pass
job.set(MRJobConfig.JOB_NAME,
- jobName != null ? jobName : "JOB" + Utilities.randGen.nextInt());
+ jobName != null ? jobName
+ : "JOB" + ThreadLocalRandom.current().nextInt());
}
// add input path
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
index 8f21f7c..17b5b4b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.rcfile.truncate;
import java.io.IOException;
import java.io.Serializable;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
@@ -157,8 +158,8 @@ public class ColumnTruncateTask extends Task<ColumnTruncateWork> implements Seri
if (noName) {
// This is for a special case to ensure unit tests pass
- job.set(MRJobConfig.JOB_NAME,
- jobName != null ? jobName : "JOB" + Utilities.randGen.nextInt());
+ job.set(MRJobConfig.JOB_NAME, jobName != null ? jobName
+ : "JOB" + ThreadLocalRandom.current().nextInt());
}
try {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
index 1de7a45..1334a26 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
@@ -33,7 +34,6 @@ import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -161,7 +161,8 @@ public class SparkPartitionPruningSinkOperator extends Operator<SparkPartitionPr
fs.mkdirs(path);
while (true) {
- path = new Path(path, String.valueOf(Utilities.randGen.nextInt()));
+ path = new Path(path, String
+ .valueOf(ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE)));
if (!fs.exists(path)) {
break;
}