You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2019/06/17 17:49:33 UTC
[hive] branch master updated: HIVE-21864:
LlapBaseInputFormat#closeAll() throws ConcurrentModificationException
(Shubham Chaurasia, reviewed by Jason Dere)
This is an automated email from the ASF dual-hosted git repository.
jdere pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new e655579 HIVE-21864: LlapBaseInputFormat#closeAll() throws ConcurrentModificationException (Shubham Chaurasia, reviewed by Jason Dere)
e655579 is described below
commit e655579f29de4d6d2a02a08efd9826a46ad163a8
Author: Jason Dere <jd...@cloudera.com>
AuthorDate: Mon Jun 17 10:48:56 2019 -0700
HIVE-21864: LlapBaseInputFormat#closeAll() throws ConcurrentModificationException (Shubham Chaurasia, reviewed by Jason Dere)
---
.../org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java | 4 +-
.../hive/jdbc/TestJdbcWithMiniLlapArrow.java | 89 ++++++++++++++++++++++
.../hadoop/hive/llap/LlapBaseInputFormat.java | 17 +++--
3 files changed, 103 insertions(+), 7 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index d2e9514..ab79b42 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -106,7 +106,7 @@ public abstract class BaseJdbcWithMiniLlap {
private static Path kvDataFilePath;
private static Path dataTypesFilePath;
- private static HiveConf conf = null;
+ protected static HiveConf conf = null;
private static Connection hs2Conn = null;
// This method should be called by sub-classes in a @BeforeClass initializer
@@ -160,7 +160,7 @@ public abstract class BaseJdbcWithMiniLlap {
}
}
- private void createTestTable(String tableName) throws Exception {
+ protected void createTestTable(String tableName) throws Exception {
createTestTable(hs2Conn, null, tableName, kvDataFilePath.toString());
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
index b69a2f9..1b088e2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -21,12 +21,19 @@ package org.apache.hive.jdbc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals;
import java.math.BigDecimal;
+
+import com.google.common.collect.Iterables;
import org.apache.hadoop.hive.common.type.Date;
import org.apache.hadoop.hive.common.type.Timestamp;
+
+import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
import org.apache.hadoop.hive.llap.Row;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
import org.junit.BeforeClass;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -35,6 +42,9 @@ import org.junit.Test;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Connection;
+import java.util.concurrent.Callable;
+import java.util.stream.IntStream;
+
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
@@ -427,5 +437,84 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
assertNotNull("tExecute", tExecuteHolder.throwable);
assertNull("tCancel", tKillHolder.throwable);
}
+
+ @Test
+ public void testConcurrentAddAndCloseAndCloseAllConnections() throws Exception {
+ createTestTable("testtab1");
+
+ String url = miniHS2.getJdbcURL();
+ String user = System.getProperty("user.name");
+ String pwd = user;
+
+ InputFormat<NullWritable, Row> inputFormat = getInputFormat();
+
+ // Get splits
+ JobConf job = new JobConf(conf);
+ job.set(LlapBaseInputFormat.URL_KEY, url);
+ job.set(LlapBaseInputFormat.USER_KEY, user);
+ job.set(LlapBaseInputFormat.PWD_KEY, pwd);
+ job.set(LlapBaseInputFormat.QUERY_KEY, "select * from testtab1");
+
+ final String[] handleIds = IntStream.range(0, 20).boxed().map(i -> "handleId-" + i).toArray(String[]::new);
+
+ final ExceptionHolder exceptionHolder = new ExceptionHolder();
+
+ // addConnThread thread will keep adding connections
+ // closeConnThread thread tries close connection(s) associated to handleIds, one at a time
+ // closeAllConnThread thread tries to close All at once.
+
+ final int numIterations = 100;
+ final Iterator<String> addConnIterator = Iterables.cycle(handleIds).iterator();
+ Thread addConnThread = new Thread(() -> executeNTimes(() -> {
+ String handleId = addConnIterator.next();
+ job.set(LlapBaseInputFormat.HANDLE_ID, handleId);
+ InputSplit[] splits = inputFormat.getSplits(job, 1);
+ assertTrue(splits.length > 0);
+ return null;
+ }, numIterations, 1, exceptionHolder));
+
+ final Iterator<String> removeConnIterator = Iterables.cycle(handleIds).iterator();
+ Thread closeConnThread = new Thread(() -> executeNTimes(() -> {
+ String handleId = removeConnIterator.next();
+ LlapBaseInputFormat.close(handleId);
+ return null;
+ }, numIterations, 2, exceptionHolder));
+
+ Thread closeAllConnThread = new Thread(() -> executeNTimes(() -> {
+ LlapBaseInputFormat.closeAll();
+ return null;
+ }, numIterations, 5, exceptionHolder));
+
+ addConnThread.start();
+ closeConnThread.start();
+ closeAllConnThread.start();
+
+ closeAllConnThread.join();
+ closeConnThread.join();
+ addConnThread.join();
+
+ Throwable throwable = exceptionHolder.throwable;
+ assertNull("Something went wrong while testAddCloseCloseAllConnections" + throwable, throwable);
+
+ }
+
+ private void executeNTimes(Callable action, int noOfTimes, long intervalMillis, ExceptionHolder exceptionHolder) {
+ for (int i = 0; i < noOfTimes; i++) {
+ try {
+ action.call();
+ Thread.sleep(intervalMillis);
+ } catch (Exception e) {
+ // populate first exception only
+ if (exceptionHolder.throwable == null) {
+ synchronized (this) {
+ if (exceptionHolder.throwable == null) {
+ exceptionHolder.throwable = e;
+ }
+ }
+ }
+ }
+ }
+ }
+
}
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index ff91f47..2aa82b5 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -31,6 +31,7 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -326,6 +327,10 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
synchronized (lock) {
handleConnections = connectionMap.remove(handleId);
}
+ closeConnections(handleId, handleConnections);
+ }
+
+ private static void closeConnections(String handleId, List<Connection> handleConnections) {
if (handleConnections != null) {
LOG.debug("Closing {} connections for handle ID {}", handleConnections.size(), handleId);
for (Connection conn : handleConnections) {
@@ -345,11 +350,13 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
*/
public static void closeAll() {
LOG.debug("Closing all handles");
- for (String handleId : connectionMap.keySet()) {
- try {
- close(handleId);
- } catch (Exception err) {
- LOG.error("Error closing handle ID " + handleId, err);
+ synchronized (lock) {
+ Iterator<Map.Entry<String, List<Connection>>> itr = connectionMap.entrySet().iterator();
+ Map.Entry<String, List<Connection>> connHandle = null;
+ while (itr.hasNext()) {
+ connHandle = itr.next();
+ closeConnections(connHandle.getKey(), connHandle.getValue());
+ itr.remove();
}
}
}