You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/05/20 01:16:36 UTC

[1/3] hive git commit: HIVE-16671 : LLAP IO: BufferUnderflowException may happen in very rare(?) cases due to ORC end-of-CB estimation (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master 85415f7b8 -> bb2f25c1a


HIVE-16671 : LLAP IO: BufferUnderflowException may happen in very rare(?) cases due to ORC end-of-CB estimation (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5d62dc8a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5d62dc8a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5d62dc8a

Branch: refs/heads/master
Commit: 5d62dc8ae853517e4c477516283af67439a26f0d
Parents: 85415f7
Author: sergey <se...@apache.org>
Authored: Fri May 19 17:22:35 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Fri May 19 17:22:35 2017 -0700

----------------------------------------------------------------------
 .../ql/io/orc/encoded/EncodedReaderImpl.java    | 81 +++++++++++++++++---
 .../io/orc/encoded/TestEncodedReaderImpl.java   | 77 +++++++++++++++++++
 2 files changed, 149 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/5d62dc8a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 5b2e9b5..6cd85d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -50,6 +50,8 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
 import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
 import org.apache.orc.OrcProto;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import sun.misc.Cleaner;
 
 
@@ -1229,10 +1231,26 @@ class EncodedReaderImpl implements EncodedReader {
     ByteBuffer slice = null;
     ByteBuffer compressed = current.getChunk();
     long cbStartOffset = current.getOffset();
-    int b0 = compressed.get() & 0xff;
-    int b1 = compressed.get() & 0xff;
-    int b2 = compressed.get() & 0xff;
+    int b0 = -1, b1 = -1, b2 = -1;
+    // First, read the CB header. Due to ORC estimates, ZCR, etc. this can be complex.
+    if (compressed.remaining() >= 3) {
+      // The overwhelming majority of cases will go here. Read 3 bytes. Tada!
+      b0 = compressed.get() & 0xff;
+      b1 = compressed.get() & 0xff;
+      b2 = compressed.get() & 0xff;
+    } else {
+      // Bad luck! Handle the corner cases where 3 bytes are in multiple blocks.
+      int[] bytes = new int[3];
+      current = readLengthBytesFromSmallBuffers(
+          current, cbStartOffset, bytes, badEstimates, isTracingEnabled);
+      if (current == null) return null;
+      compressed = current.getChunk();
+      b0 = bytes[0];
+      b1 = bytes[1];
+      b2 = bytes[2];
+    }
     int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
+
     if (chunkLength > bufferSize) {
       throw new IllegalArgumentException("Buffer size too small. size = " +
           bufferSize + " needed = " + chunkLength);
@@ -1252,7 +1270,8 @@ class EncodedReaderImpl implements EncodedReader {
           cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, cacheBuffers);
     }
     if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
-      badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 0));
+      badEstimates.add(addIncompleteCompressionBuffer(
+          cbStartOffset, current, 0, isTracingEnabled));
       return null; // This is impossible to read from this chunk.
     }
 
@@ -1304,12 +1323,56 @@ class EncodedReaderImpl implements EncodedReader {
         }
         tmp.removeSelf();
       } else {
-        badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount));
+        badEstimates.add(addIncompleteCompressionBuffer(
+            cbStartOffset, tmp, extraChunkCount, isTracingEnabled));
+        return null; // This is impossible to read from this chunk.
+      }
+    }
+  }
+
+
+  @VisibleForTesting
+  static BufferChunk readLengthBytesFromSmallBuffers(BufferChunk first, long cbStartOffset,
+      int[] result, List<IncompleteCb> badEstimates, boolean isTracingEnabled) throws IOException {
+    if (!first.hasContiguousNext()) {
+      badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, first, 0, isTracingEnabled));
+      return null; // This is impossible to read from this chunk.
+    }
+    int ix = readLengthBytes(first.getChunk(), result, 0);
+    assert ix < 3; // Otherwise we wouldn't be here.
+    DiskRangeList current = first.next;
+    first.removeSelf();
+    while (true) {
+      if (!(current instanceof BufferChunk)) {
+        throw new IOException(
+            "Trying to extend compressed block into uncompressed block " + current);
+      }
+      BufferChunk currentBc = (BufferChunk) current;
+      ix = readLengthBytes(currentBc.getChunk(), result, ix);
+      if (ix == 3) return currentBc; // Done, we have 3 bytes. Continue reading this buffer.
+      DiskRangeList tmp = current;
+      current = current.hasContiguousNext() ? current.next : null;
+      if (current != null) {
+        if (isTracingEnabled) {
+          LOG.trace("Removing partial CB " + tmp + " from ranges after copying its contents");
+        }
+        tmp.removeSelf();
+      } else {
+        badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, tmp, -1, isTracingEnabled));
         return null; // This is impossible to read from this chunk.
       }
     }
   }
 
+  private static int readLengthBytes(ByteBuffer compressed, int[] bytes, int ix) {
+    int byteCount = compressed.remaining();
+    while (byteCount > 0 && ix < 3) {
+      bytes[ix++] = compressed.get() & 0xff;
+      --byteCount;
+    }
+    return ix;
+  }
+
   private void releaseBuffers(Collection<ByteBuffer> toRelease, boolean isFromDataReader) {
     if (toRelease == null) return;
     for (ByteBuffer buf : toRelease) {
@@ -1342,12 +1405,12 @@ class EncodedReaderImpl implements EncodedReader {
   }
 
 
-  private IncompleteCb addIncompleteCompressionBuffer(
-      long cbStartOffset, DiskRangeList target, int extraChunkCount) {
+  private static IncompleteCb addIncompleteCompressionBuffer(long cbStartOffset,
+      DiskRangeList target, int extraChunkCountToLog, boolean isTracingEnabled) {
     IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());
     if (isTracingEnabled) {
-      LOG.trace("Replacing " + target + " (and " + extraChunkCount + " previous chunks) with "
-          + icb + " in the buffers");
+      LOG.trace("Replacing " + target + " (and " + extraChunkCountToLog
+          + " previous chunks) with " + icb + " in the buffers");
     }
     target.replaceSelfWith(icb);
     return icb;

http://git-wip-us.apache.org/repos/asf/hive/blob/5d62dc8a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java
new file mode 100644
index 0000000..28ca441
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc.encoded;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.orc.impl.BufferChunk;
+import org.junit.Test;
+
+public class TestEncodedReaderImpl {
+  @Test
+  public void testReadLength() throws IOException {
+    ByteBuffer one = ByteBuffer.wrap(new byte[] { 1 }), two = ByteBuffer.wrap(new byte[] { 2 }),
+        three = ByteBuffer.wrap(new byte[] { 3 }), twoThree = ByteBuffer.wrap(new byte[] { 2, 3 }),
+        oneTwo = ByteBuffer.wrap(new byte[] { 1, 2 });
+    BufferChunk bc = new BufferChunk(one, 0);
+    int[] result = new int[3];
+    List<IncompleteCb> l = new ArrayList<>();
+    BufferChunk rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+    assertNull(rv);
+    one.position(0);
+    bc.insertAfter(new BufferChunk(two, 1));
+    Arrays.fill(result, -1);
+    rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+    assertNull(rv);
+    one.position(0);
+    two.position(0);
+    bc.insertAfter(new BufferChunk(two, 1)).insertAfter(new BufferChunk(three, 2));
+    Arrays.fill(result, -1);
+    rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+    assertNotNull(rv);
+    for (int i = 0; i < result.length; ++i) {
+      assertEquals(i + 1, result[i]);
+    }
+    one.position(0);
+    bc.insertAfter(new BufferChunk(twoThree, 1));
+    Arrays.fill(result, -1);
+    rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+    assertNotNull(rv);
+    for (int i = 0; i < result.length; ++i) {
+      assertEquals(i + 1, result[i]);
+    }
+    bc = new BufferChunk(oneTwo, 0);
+    Arrays.fill(result, -1);
+    rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+    assertNull(rv);
+    three.position(0);
+    bc.insertAfter(new BufferChunk(three, 2));
+    Arrays.fill(result, -1);
+    rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+    assertNotNull(rv);
+    for (int i = 0; i < result.length; ++i) {
+      assertEquals(i + 1, result[i]);
+    }
+  }
+}


[2/3] hive git commit: HIVE-16703 : Hive may add the same file to the session and vertex in Tez (Sergey Shelukhin, reviewed by Siddharth Seth)

Posted by se...@apache.org.
HIVE-16703 : Hive may add the same file to the session and vertex in Tez (Sergey Shelukhin, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8d951fa4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8d951fa4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8d951fa4

Branch: refs/heads/master
Commit: 8d951fa4e0665942c4c1cb44a7914f70b0604f2d
Parents: 5d62dc8
Author: sergey <se...@apache.org>
Authored: Fri May 19 17:24:27 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Fri May 19 17:24:27 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/tez/DagUtils.java       | 29 ++++++++++++++++----
 .../hive/ql/exec/tez/TezSessionState.java       |  5 ++--
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 14 +++++++---
 .../hadoop/hive/ql/exec/tez/TestTezTask.java    | 10 +++----
 4 files changed, 41 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index b0457be..b6e55c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -31,6 +31,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -847,12 +848,15 @@ public class DagUtils {
       String hdfsDirPathStr, Configuration conf) throws IOException, LoginException {
     List<LocalResource> tmpResources = new ArrayList<LocalResource>();
 
-    addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, getTempFilesFromConf(conf));
-    addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.ARCHIVE, getTempArchivesFromConf(conf));
+    addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE,
+        getTempFilesFromConf(conf), null);
+    addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.ARCHIVE,
+        getTempArchivesFromConf(conf), null);
     return tmpResources;
   }
 
-  private static String[] getTempFilesFromConf(Configuration conf) {
+  public static String[] getTempFilesFromConf(Configuration conf) {
+    if (conf == null) return new String[0]; // In tests.
     String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
     if (StringUtils.isNotBlank(addedFiles)) {
       HiveConf.setVar(conf, ConfVars.HIVEADDEDFILES, addedFiles);
@@ -888,21 +892,34 @@ public class DagUtils {
    * @throws LoginException when getDefaultDestDir fails with the same exception
    */
   public List<LocalResource> localizeTempFiles(String hdfsDirPathStr, Configuration conf,
-      String[] inputOutputJars) throws IOException, LoginException {
+      String[] inputOutputJars, String[] skipJars) throws IOException, LoginException {
     if (inputOutputJars == null) return null;
     List<LocalResource> tmpResources = new ArrayList<LocalResource>();
-    addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, inputOutputJars);
+    addTempResources(conf, tmpResources, hdfsDirPathStr,
+        LocalResourceType.FILE, inputOutputJars, skipJars);
     return tmpResources;
   }
 
   private void addTempResources(Configuration conf,
       List<LocalResource> tmpResources, String hdfsDirPathStr,
       LocalResourceType type,
-      String[] files) throws IOException {
+      String[] files, String[] skipFiles) throws IOException {
+    HashSet<Path> skipFileSet = null;
+    if (skipFiles != null) {
+      skipFileSet = new HashSet<>();
+      for (String skipFile : skipFiles) {
+        if (StringUtils.isBlank(skipFile)) continue;
+        skipFileSet.add(new Path(skipFile));
+      }
+    }
     for (String file : files) {
       if (!StringUtils.isNotBlank(file)) {
         continue;
       }
+      if (skipFileSet != null && skipFileSet.contains(new Path(file))) {
+        LOG.info("Skipping vertex resource " + file + " that already exists in the session");
+        continue;
+      }
       Path hdfsFilePath = new Path(hdfsDirPathStr, getResourceBaseName(new Path(file)));
       LocalResource localResource = localizeResource(new Path(file),
           hdfsFilePath, type, conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 036e918..fe5c6a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -476,9 +476,10 @@ public class TezSessionState {
       localizedResources.addAll(lrs);
     }
 
-    // these are local resources that are set through the mr "tmpjars" property
+    // these are local resources that are set through the mr "tmpjars" property; skip session files.
     List<LocalResource> handlerLr = utils.localizeTempFiles(dir, conf,
-      additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()]));
+      additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()]),
+      DagUtils.getTempFilesFromConf(conf));
 
     if (handlerLr != null) {
       localizedResources.addAll(handlerLr);

http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 1c84c6a..3356dc9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -123,6 +123,7 @@ public class TezTask extends Task<TezWork> {
     return counters;
   }
 
+
   @Override
   public int execute(DriverContext driverContext) {
     int rc = 1;
@@ -161,8 +162,12 @@ public class TezTask extends Task<TezWork> {
         // create the tez tmp dir
         scratchDir = utils.createTezDir(scratchDir, conf);
 
+        // This is used to compare global and vertex resources. Global resources are originally
+        // derived from session conf via localizeTempFilesFromConf. So, use that here.
+        Configuration sessionConf =
+            (session != null && session.getConf() != null) ? session.getConf() : conf;
         Map<String,LocalResource> inputOutputLocalResources =
-            getExtraLocalResources(jobConf, scratchDir, inputOutputJars);
+            getExtraLocalResources(jobConf, scratchDir, inputOutputJars, sessionConf);
 
         // Ensure the session is open and has the necessary local resources
         updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources);
@@ -273,10 +278,11 @@ public class TezTask extends Task<TezWork> {
    * Converted the list of jars into local resources
    */
   Map<String,LocalResource> getExtraLocalResources(JobConf jobConf, Path scratchDir,
-      String[] inputOutputJars) throws Exception {
+      String[] inputOutputJars, Configuration sessionConf) throws Exception {
     final Map<String,LocalResource> resources = new HashMap<String,LocalResource>();
-    final List<LocalResource> localResources = utils.localizeTempFiles(
-        scratchDir.toString(), jobConf, inputOutputJars);
+    // Skip the files already in session local resources...
+    final List<LocalResource> localResources = utils.localizeTempFiles(scratchDir.toString(),
+        jobConf, inputOutputJars, DagUtils.getTempFilesFromConf(sessionConf));
     if (null != localResources) {
       for (LocalResource lr : localResources) {
         resources.put(utils.getBaseName(lr), lr);

http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index 2b52056..70fedb7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -245,7 +245,7 @@ public class TestTezTask {
     final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
     resMap.put("foo.jar", res);
 
-    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null))
         .thenReturn(resources);
     when(utils.getBaseName(res)).thenReturn("foo.jar");
     when(sessionState.isOpen()).thenReturn(true);
@@ -264,7 +264,7 @@ public class TestTezTask {
     resMap.put("foo.jar", res);
     DAG dag = mock(DAG.class);
 
-    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null))
         .thenReturn(resources);
     when(utils.getBaseName(res)).thenReturn("foo.jar");
     when(sessionState.isOpen()).thenReturn(true);
@@ -282,11 +282,11 @@ public class TestTezTask {
     final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
     resMap.put("foo.jar", res);
 
-    when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
-        .thenReturn(resources);
+    when(utils.localizeTempFiles(eq(path.toString()), eq(conf), eq(inputOutputJars),
+        Mockito.<String[]>any())).thenReturn(resources);
     when(utils.getBaseName(res)).thenReturn("foo.jar");
 
-    assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars));
+    assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars, null));
   }
 
   @Test


[3/3] hive git commit: HIVE-16724 : increase session timeout for LLAP ZK token manager (Sergey Shelukhin, reviewed by Jason Dere)

Posted by se...@apache.org.
HIVE-16724 : increase session timeout for LLAP ZK token manager (Sergey Shelukhin, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bb2f25c1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bb2f25c1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bb2f25c1

Branch: refs/heads/master
Commit: bb2f25c1a189b031a9601cb00a3dc2f5d6f5ac4a
Parents: 8d951fa
Author: sergey <se...@apache.org>
Authored: Fri May 19 17:34:24 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Fri May 19 17:34:24 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  2 ++
 .../hive/llap/security/SecretManager.java       | 20 ++++++++++++++------
 2 files changed, 16 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/bb2f25c1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1c37b6e..7dedd23 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3074,6 +3074,8 @@ public class HiveConf extends Configuration {
         "By default, the clients are required to provide tokens to access HDFS/etc."),
     LLAP_ZKSM_ZK_CONNECTION_STRING("hive.llap.zk.sm.connectionString", "",
         "ZooKeeper connection string for ZooKeeper SecretManager."),
+    LLAP_ZKSM_ZK_SESSION_TIMEOUT("hive.llap.zk.sm.session.timeout", "40s", new TimeValidator(
+        TimeUnit.MILLISECONDS), "ZooKeeper session timeout for ZK SecretManager."),
     LLAP_ZK_REGISTRY_USER("hive.llap.zk.registry.user", "",
         "In the LLAP ZooKeeper-based registry, specifies the username in the Zookeeper path.\n" +
         "This should be the hive user or whichever user is running the LLAP daemon."),

http://git-wip-us.apache.org/repos/asf/hive/blob/bb2f25c1/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
index 08f8b32..8e4f233 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
@@ -175,18 +175,26 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
     zkConf.setLong(DelegationTokenManager.MAX_LIFETIME, tokenLifetime);
     zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime);
     try {
-      zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL,
+      zkConf.set(ZK_DTSM_ZK_KERBEROS_PRINCIPAL,
           SecurityUtil.getServerPrincipal(principal, "0.0.0.0"));
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab);
+    zkConf.set(ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab);
     String zkPath = "zkdtsm_" + clusterId;
     LOG.info("Using {} as ZK secret manager path", zkPath);
-    zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, zkPath);
+    zkConf.set(ZK_DTSM_ZNODE_WORKING_PATH, zkPath);
     // Hardcode SASL here. ZKDTSM only supports none or sasl and we never want none.
-    zkConf.set(SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl");
-    setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_CONNECTION_STRING,
+    zkConf.set(ZK_DTSM_ZK_AUTH_TYPE, "sasl");
+    long sessionTimeoutMs = HiveConf.getTimeVar(
+        zkConf, ConfVars.LLAP_ZKSM_ZK_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+    long newRetryCount =
+        (ZK_DTSM_ZK_NUM_RETRIES_DEFAULT * sessionTimeoutMs) / ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT;
+    long connTimeoutMs = Math.max(sessionTimeoutMs, ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT);
+    zkConf.set(ZK_DTSM_ZK_SESSION_TIMEOUT, Long.toString(sessionTimeoutMs));
+    zkConf.set(ZK_DTSM_ZK_CONNECTION_TIMEOUT, Long.toString(connTimeoutMs));
+    zkConf.set(ZK_DTSM_ZK_NUM_RETRIES, Long.toString(newRetryCount));
+    setZkConfIfNotSet(zkConf, ZK_DTSM_ZK_CONNECTION_STRING,
         HiveConf.getVar(zkConf, ConfVars.LLAP_ZKSM_ZK_CONNECTION_STRING));
 
     UserGroupInformation zkUgi = null;
@@ -201,7 +209,7 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
   public static SecretManager createSecretManager(Configuration conf, String clusterId) {
     String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
         llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
-    return SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, clusterId);
+    return createSecretManager(conf, llapPrincipal, llapKeytab, clusterId);
   }
 
   public static SecretManager createSecretManager(