You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2019/06/17 17:49:33 UTC

[hive] branch master updated: HIVE-21864: LlapBaseInputFormat#closeAll() throws ConcurrentModificationException (Shubham Chaurasia, reviewed by Jason Dere)

This is an automated email from the ASF dual-hosted git repository.

jdere pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e655579  HIVE-21864: LlapBaseInputFormat#closeAll() throws ConcurrentModificationException (Shubham Chaurasia, reviewed by Jason Dere)
e655579 is described below

commit e655579f29de4d6d2a02a08efd9826a46ad163a8
Author: Jason Dere <jd...@cloudera.com>
AuthorDate: Mon Jun 17 10:48:56 2019 -0700

    HIVE-21864: LlapBaseInputFormat#closeAll() throws ConcurrentModificationException (Shubham Chaurasia, reviewed by Jason Dere)
---
 .../org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java |  4 +-
 .../hive/jdbc/TestJdbcWithMiniLlapArrow.java       | 89 ++++++++++++++++++++++
 .../hadoop/hive/llap/LlapBaseInputFormat.java      | 17 +++--
 3 files changed, 103 insertions(+), 7 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
index d2e9514..ab79b42 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/BaseJdbcWithMiniLlap.java
@@ -106,7 +106,7 @@ public abstract class BaseJdbcWithMiniLlap {
   private static Path kvDataFilePath;
   private static Path dataTypesFilePath;
 
-  private static HiveConf conf = null;
+  protected static HiveConf conf = null;
   private static Connection hs2Conn = null;
 
   // This method should be called by sub-classes in a @BeforeClass initializer
@@ -160,7 +160,7 @@ public abstract class BaseJdbcWithMiniLlap {
     }
   }
 
-  private void createTestTable(String tableName) throws Exception {
+  protected void createTestTable(String tableName) throws Exception {
     createTestTable(hs2Conn, null, tableName, kvDataFilePath.toString());
   }
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
index b69a2f9..1b088e2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -21,12 +21,19 @@ package org.apache.hive.jdbc;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertArrayEquals;
 import java.math.BigDecimal;
+
+import com.google.common.collect.Iterables;
 import org.apache.hadoop.hive.common.type.Date;
 import org.apache.hadoop.hive.common.type.Timestamp;
+
+import java.util.Iterator;
 import java.util.List;
 import org.apache.hadoop.hive.llap.FieldDesc;
+import org.apache.hadoop.hive.llap.LlapBaseInputFormat;
 import org.apache.hadoop.hive.llap.Row;
 import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
 import org.junit.BeforeClass;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -35,6 +42,9 @@ import org.junit.Test;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Connection;
+import java.util.concurrent.Callable;
+import java.util.stream.IntStream;
+
 import org.apache.hadoop.hive.ql.exec.UDF;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
@@ -427,5 +437,84 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
     assertNotNull("tExecute", tExecuteHolder.throwable);
     assertNull("tCancel", tKillHolder.throwable);
   }
+
+  @Test
+  public void testConcurrentAddAndCloseAndCloseAllConnections() throws Exception {
+    createTestTable("testtab1");
+
+    String url = miniHS2.getJdbcURL();
+    String user = System.getProperty("user.name");
+    String pwd = user;
+
+    InputFormat<NullWritable, Row> inputFormat = getInputFormat();
+
+    // Get splits
+    JobConf job = new JobConf(conf);
+    job.set(LlapBaseInputFormat.URL_KEY, url);
+    job.set(LlapBaseInputFormat.USER_KEY, user);
+    job.set(LlapBaseInputFormat.PWD_KEY, pwd);
+    job.set(LlapBaseInputFormat.QUERY_KEY, "select * from testtab1");
+
+    final String[] handleIds = IntStream.range(0, 20).boxed().map(i -> "handleId-" + i).toArray(String[]::new);
+
+    final ExceptionHolder exceptionHolder = new ExceptionHolder();
+
+    // addConnThread thread will keep adding connections
+    // closeConnThread thread tries close connection(s) associated to handleIds, one at a time
+    // closeAllConnThread thread tries to close All at once.
+
+    final int numIterations = 100;
+    final Iterator<String> addConnIterator = Iterables.cycle(handleIds).iterator();
+    Thread addConnThread = new Thread(() -> executeNTimes(() -> {
+      String handleId = addConnIterator.next();
+      job.set(LlapBaseInputFormat.HANDLE_ID, handleId);
+      InputSplit[] splits = inputFormat.getSplits(job, 1);
+      assertTrue(splits.length > 0);
+      return null;
+    }, numIterations, 1, exceptionHolder));
+
+    final Iterator<String> removeConnIterator = Iterables.cycle(handleIds).iterator();
+    Thread closeConnThread = new Thread(() -> executeNTimes(() -> {
+      String handleId = removeConnIterator.next();
+      LlapBaseInputFormat.close(handleId);
+      return null;
+    }, numIterations, 2, exceptionHolder));
+
+    Thread closeAllConnThread = new Thread(() -> executeNTimes(() -> {
+      LlapBaseInputFormat.closeAll();
+      return null;
+    }, numIterations, 5, exceptionHolder));
+
+    addConnThread.start();
+    closeConnThread.start();
+    closeAllConnThread.start();
+
+    closeAllConnThread.join();
+    closeConnThread.join();
+    addConnThread.join();
+
+    Throwable throwable = exceptionHolder.throwable;
+    assertNull("Something went wrong while testAddCloseCloseAllConnections" + throwable, throwable);
+
+  }
+
+  private void executeNTimes(Callable action, int noOfTimes, long intervalMillis, ExceptionHolder exceptionHolder) {
+    for (int i = 0; i < noOfTimes; i++) {
+      try {
+        action.call();
+        Thread.sleep(intervalMillis);
+      } catch (Exception e) {
+        // populate first exception only
+        if (exceptionHolder.throwable == null) {
+          synchronized (this) {
+            if (exceptionHolder.throwable == null) {
+              exceptionHolder.throwable = e;
+            }
+          }
+        }
+      }
+    }
+  }
+
 }
 
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index ff91f47..2aa82b5 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -31,6 +31,7 @@ import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -326,6 +327,10 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
     synchronized (lock) {
       handleConnections = connectionMap.remove(handleId);
     }
+    closeConnections(handleId, handleConnections);
+  }
+
+  private static void closeConnections(String handleId, List<Connection> handleConnections) {
     if (handleConnections != null) {
       LOG.debug("Closing {} connections for handle ID {}", handleConnections.size(), handleId);
       for (Connection conn : handleConnections) {
@@ -345,11 +350,13 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
    */
   public static void closeAll() {
     LOG.debug("Closing all handles");
-    for (String handleId : connectionMap.keySet()) {
-      try {
-        close(handleId);
-      } catch (Exception err) {
-        LOG.error("Error closing handle ID " + handleId, err);
+    synchronized (lock) {
+      Iterator<Map.Entry<String, List<Connection>>> itr = connectionMap.entrySet().iterator();
+      Map.Entry<String, List<Connection>> connHandle = null;
+      while (itr.hasNext()) {
+        connHandle = itr.next();
+        closeConnections(connHandle.getKey(), connHandle.getValue());
+        itr.remove();
       }
     }
   }