You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/05/20 01:16:36 UTC
[1/3] hive git commit: HIVE-16671 : LLAP IO: BufferUnderflowException
may happen in very rare(?) cases due to ORC end-of-CB estimation (Sergey
Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master 85415f7b8 -> bb2f25c1a
HIVE-16671 : LLAP IO: BufferUnderflowException may happen in very rare(?) cases due to ORC end-of-CB estimation (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5d62dc8a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5d62dc8a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5d62dc8a
Branch: refs/heads/master
Commit: 5d62dc8ae853517e4c477516283af67439a26f0d
Parents: 85415f7
Author: sergey <se...@apache.org>
Authored: Fri May 19 17:22:35 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Fri May 19 17:22:35 2017 -0700
----------------------------------------------------------------------
.../ql/io/orc/encoded/EncodedReaderImpl.java | 81 +++++++++++++++++---
.../io/orc/encoded/TestEncodedReaderImpl.java | 77 +++++++++++++++++++
2 files changed, 149 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/5d62dc8a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
index 5b2e9b5..6cd85d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java
@@ -50,6 +50,8 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
import org.apache.orc.OrcProto;
+import com.google.common.annotations.VisibleForTesting;
+
import sun.misc.Cleaner;
@@ -1229,10 +1231,26 @@ class EncodedReaderImpl implements EncodedReader {
ByteBuffer slice = null;
ByteBuffer compressed = current.getChunk();
long cbStartOffset = current.getOffset();
- int b0 = compressed.get() & 0xff;
- int b1 = compressed.get() & 0xff;
- int b2 = compressed.get() & 0xff;
+ int b0 = -1, b1 = -1, b2 = -1;
+ // First, read the CB header. Due to ORC estimates, ZCR, etc. this can be complex.
+ if (compressed.remaining() >= 3) {
+ // The overwhelming majority of cases will go here. Read 3 bytes. Tada!
+ b0 = compressed.get() & 0xff;
+ b1 = compressed.get() & 0xff;
+ b2 = compressed.get() & 0xff;
+ } else {
+ // Bad luck! Handle the corner cases where 3 bytes are in multiple blocks.
+ int[] bytes = new int[3];
+ current = readLengthBytesFromSmallBuffers(
+ current, cbStartOffset, bytes, badEstimates, isTracingEnabled);
+ if (current == null) return null;
+ compressed = current.getChunk();
+ b0 = bytes[0];
+ b1 = bytes[1];
+ b2 = bytes[2];
+ }
int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
+
if (chunkLength > bufferSize) {
throw new IllegalArgumentException("Buffer size too small. size = " +
bufferSize + " needed = " + chunkLength);
@@ -1252,7 +1270,8 @@ class EncodedReaderImpl implements EncodedReader {
cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, cacheBuffers);
}
if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
- badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 0));
+ badEstimates.add(addIncompleteCompressionBuffer(
+ cbStartOffset, current, 0, isTracingEnabled));
return null; // This is impossible to read from this chunk.
}
@@ -1304,12 +1323,56 @@ class EncodedReaderImpl implements EncodedReader {
}
tmp.removeSelf();
} else {
- badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount));
+ badEstimates.add(addIncompleteCompressionBuffer(
+ cbStartOffset, tmp, extraChunkCount, isTracingEnabled));
+ return null; // This is impossible to read from this chunk.
+ }
+ }
+ }
+
+
+ @VisibleForTesting
+ static BufferChunk readLengthBytesFromSmallBuffers(BufferChunk first, long cbStartOffset,
+ int[] result, List<IncompleteCb> badEstimates, boolean isTracingEnabled) throws IOException {
+ if (!first.hasContiguousNext()) {
+ badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, first, 0, isTracingEnabled));
+ return null; // This is impossible to read from this chunk.
+ }
+ int ix = readLengthBytes(first.getChunk(), result, 0);
+ assert ix < 3; // Otherwise we wouldn't be here.
+ DiskRangeList current = first.next;
+ first.removeSelf();
+ while (true) {
+ if (!(current instanceof BufferChunk)) {
+ throw new IOException(
+ "Trying to extend compressed block into uncompressed block " + current);
+ }
+ BufferChunk currentBc = (BufferChunk) current;
+ ix = readLengthBytes(currentBc.getChunk(), result, ix);
+ if (ix == 3) return currentBc; // Done, we have 3 bytes. Continue reading this buffer.
+ DiskRangeList tmp = current;
+ current = current.hasContiguousNext() ? current.next : null;
+ if (current != null) {
+ if (isTracingEnabled) {
+ LOG.trace("Removing partial CB " + tmp + " from ranges after copying its contents");
+ }
+ tmp.removeSelf();
+ } else {
+ badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, tmp, -1, isTracingEnabled));
return null; // This is impossible to read from this chunk.
}
}
}
+ private static int readLengthBytes(ByteBuffer compressed, int[] bytes, int ix) {
+ int byteCount = compressed.remaining();
+ while (byteCount > 0 && ix < 3) {
+ bytes[ix++] = compressed.get() & 0xff;
+ --byteCount;
+ }
+ return ix;
+ }
+
private void releaseBuffers(Collection<ByteBuffer> toRelease, boolean isFromDataReader) {
if (toRelease == null) return;
for (ByteBuffer buf : toRelease) {
@@ -1342,12 +1405,12 @@ class EncodedReaderImpl implements EncodedReader {
}
- private IncompleteCb addIncompleteCompressionBuffer(
- long cbStartOffset, DiskRangeList target, int extraChunkCount) {
+ private static IncompleteCb addIncompleteCompressionBuffer(long cbStartOffset,
+ DiskRangeList target, int extraChunkCountToLog, boolean isTracingEnabled) {
IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());
if (isTracingEnabled) {
- LOG.trace("Replacing " + target + " (and " + extraChunkCount + " previous chunks) with "
- + icb + " in the buffers");
+ LOG.trace("Replacing " + target + " (and " + extraChunkCountToLog
+ + " previous chunks) with " + icb + " in the buffers");
}
target.replaceSelfWith(icb);
return icb;
http://git-wip-us.apache.org/repos/asf/hive/blob/5d62dc8a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java
new file mode 100644
index 0000000..28ca441
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc.encoded;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.orc.impl.BufferChunk;
+import org.junit.Test;
+
+public class TestEncodedReaderImpl {
+ @Test
+ public void testReadLength() throws IOException {
+ ByteBuffer one = ByteBuffer.wrap(new byte[] { 1 }), two = ByteBuffer.wrap(new byte[] { 2 }),
+ three = ByteBuffer.wrap(new byte[] { 3 }), twoThree = ByteBuffer.wrap(new byte[] { 2, 3 }),
+ oneTwo = ByteBuffer.wrap(new byte[] { 1, 2 });
+ BufferChunk bc = new BufferChunk(one, 0);
+ int[] result = new int[3];
+ List<IncompleteCb> l = new ArrayList<>();
+ BufferChunk rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+ assertNull(rv);
+ one.position(0);
+ bc.insertAfter(new BufferChunk(two, 1));
+ Arrays.fill(result, -1);
+ rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+ assertNull(rv);
+ one.position(0);
+ two.position(0);
+ bc.insertAfter(new BufferChunk(two, 1)).insertAfter(new BufferChunk(three, 2));
+ Arrays.fill(result, -1);
+ rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+ assertNotNull(rv);
+ for (int i = 0; i < result.length; ++i) {
+ assertEquals(i + 1, result[i]);
+ }
+ one.position(0);
+ bc.insertAfter(new BufferChunk(twoThree, 1));
+ Arrays.fill(result, -1);
+ rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+ assertNotNull(rv);
+ for (int i = 0; i < result.length; ++i) {
+ assertEquals(i + 1, result[i]);
+ }
+ bc = new BufferChunk(oneTwo, 0);
+ Arrays.fill(result, -1);
+ rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+ assertNull(rv);
+ three.position(0);
+ bc.insertAfter(new BufferChunk(three, 2));
+ Arrays.fill(result, -1);
+ rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true);
+ assertNotNull(rv);
+ for (int i = 0; i < result.length; ++i) {
+ assertEquals(i + 1, result[i]);
+ }
+ }
+}
[2/3] hive git commit: HIVE-16703 : Hive may add the same file to the
session and vertex in Tez (Sergey Shelukhin, reviewed by Siddharth Seth)
Posted by se...@apache.org.
HIVE-16703 : Hive may add the same file to the session and vertex in Tez (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8d951fa4
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8d951fa4
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8d951fa4
Branch: refs/heads/master
Commit: 8d951fa4e0665942c4c1cb44a7914f70b0604f2d
Parents: 5d62dc8
Author: sergey <se...@apache.org>
Authored: Fri May 19 17:24:27 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Fri May 19 17:24:27 2017 -0700
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/tez/DagUtils.java | 29 ++++++++++++++++----
.../hive/ql/exec/tez/TezSessionState.java | 5 ++--
.../apache/hadoop/hive/ql/exec/tez/TezTask.java | 14 +++++++---
.../hadoop/hive/ql/exec/tez/TestTezTask.java | 10 +++----
4 files changed, 41 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index b0457be..b6e55c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -31,6 +31,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -847,12 +848,15 @@ public class DagUtils {
String hdfsDirPathStr, Configuration conf) throws IOException, LoginException {
List<LocalResource> tmpResources = new ArrayList<LocalResource>();
- addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, getTempFilesFromConf(conf));
- addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.ARCHIVE, getTempArchivesFromConf(conf));
+ addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE,
+ getTempFilesFromConf(conf), null);
+ addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.ARCHIVE,
+ getTempArchivesFromConf(conf), null);
return tmpResources;
}
- private static String[] getTempFilesFromConf(Configuration conf) {
+ public static String[] getTempFilesFromConf(Configuration conf) {
+ if (conf == null) return new String[0]; // In tests.
String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
if (StringUtils.isNotBlank(addedFiles)) {
HiveConf.setVar(conf, ConfVars.HIVEADDEDFILES, addedFiles);
@@ -888,21 +892,34 @@ public class DagUtils {
* @throws LoginException when getDefaultDestDir fails with the same exception
*/
public List<LocalResource> localizeTempFiles(String hdfsDirPathStr, Configuration conf,
- String[] inputOutputJars) throws IOException, LoginException {
+ String[] inputOutputJars, String[] skipJars) throws IOException, LoginException {
if (inputOutputJars == null) return null;
List<LocalResource> tmpResources = new ArrayList<LocalResource>();
- addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, inputOutputJars);
+ addTempResources(conf, tmpResources, hdfsDirPathStr,
+ LocalResourceType.FILE, inputOutputJars, skipJars);
return tmpResources;
}
private void addTempResources(Configuration conf,
List<LocalResource> tmpResources, String hdfsDirPathStr,
LocalResourceType type,
- String[] files) throws IOException {
+ String[] files, String[] skipFiles) throws IOException {
+ HashSet<Path> skipFileSet = null;
+ if (skipFiles != null) {
+ skipFileSet = new HashSet<>();
+ for (String skipFile : skipFiles) {
+ if (StringUtils.isBlank(skipFile)) continue;
+ skipFileSet.add(new Path(skipFile));
+ }
+ }
for (String file : files) {
if (!StringUtils.isNotBlank(file)) {
continue;
}
+ if (skipFileSet != null && skipFileSet.contains(new Path(file))) {
+ LOG.info("Skipping vertex resource " + file + " that already exists in the session");
+ continue;
+ }
Path hdfsFilePath = new Path(hdfsDirPathStr, getResourceBaseName(new Path(file)));
LocalResource localResource = localizeResource(new Path(file),
hdfsFilePath, type, conf);
http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index 036e918..fe5c6a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -476,9 +476,10 @@ public class TezSessionState {
localizedResources.addAll(lrs);
}
- // these are local resources that are set through the mr "tmpjars" property
+ // these are local resources that are set through the mr "tmpjars" property; skip session files.
List<LocalResource> handlerLr = utils.localizeTempFiles(dir, conf,
- additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()]));
+ additionalFilesNotFromConf.toArray(new String[additionalFilesNotFromConf.size()]),
+ DagUtils.getTempFilesFromConf(conf));
if (handlerLr != null) {
localizedResources.addAll(handlerLr);
http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 1c84c6a..3356dc9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -123,6 +123,7 @@ public class TezTask extends Task<TezWork> {
return counters;
}
+
@Override
public int execute(DriverContext driverContext) {
int rc = 1;
@@ -161,8 +162,12 @@ public class TezTask extends Task<TezWork> {
// create the tez tmp dir
scratchDir = utils.createTezDir(scratchDir, conf);
+ // This is used to compare global and vertex resources. Global resources are originally
+ // derived from session conf via localizeTempFilesFromConf. So, use that here.
+ Configuration sessionConf =
+ (session != null && session.getConf() != null) ? session.getConf() : conf;
Map<String,LocalResource> inputOutputLocalResources =
- getExtraLocalResources(jobConf, scratchDir, inputOutputJars);
+ getExtraLocalResources(jobConf, scratchDir, inputOutputJars, sessionConf);
// Ensure the session is open and has the necessary local resources
updateSession(session, jobConf, scratchDir, inputOutputJars, inputOutputLocalResources);
@@ -273,10 +278,11 @@ public class TezTask extends Task<TezWork> {
* Converted the list of jars into local resources
*/
Map<String,LocalResource> getExtraLocalResources(JobConf jobConf, Path scratchDir,
- String[] inputOutputJars) throws Exception {
+ String[] inputOutputJars, Configuration sessionConf) throws Exception {
final Map<String,LocalResource> resources = new HashMap<String,LocalResource>();
- final List<LocalResource> localResources = utils.localizeTempFiles(
- scratchDir.toString(), jobConf, inputOutputJars);
+ // Skip the files already in session local resources...
+ final List<LocalResource> localResources = utils.localizeTempFiles(scratchDir.toString(),
+ jobConf, inputOutputJars, DagUtils.getTempFilesFromConf(sessionConf));
if (null != localResources) {
for (LocalResource lr : localResources) {
resources.put(utils.getBaseName(lr), lr);
http://git-wip-us.apache.org/repos/asf/hive/blob/8d951fa4/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index 2b52056..70fedb7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -245,7 +245,7 @@ public class TestTezTask {
final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
resMap.put("foo.jar", res);
- when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+ when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null))
.thenReturn(resources);
when(utils.getBaseName(res)).thenReturn("foo.jar");
when(sessionState.isOpen()).thenReturn(true);
@@ -264,7 +264,7 @@ public class TestTezTask {
resMap.put("foo.jar", res);
DAG dag = mock(DAG.class);
- when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
+ when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars, null))
.thenReturn(resources);
when(utils.getBaseName(res)).thenReturn("foo.jar");
when(sessionState.isOpen()).thenReturn(true);
@@ -282,11 +282,11 @@ public class TestTezTask {
final Map<String,LocalResource> resMap = new HashMap<String,LocalResource>();
resMap.put("foo.jar", res);
- when(utils.localizeTempFiles(path.toString(), conf, inputOutputJars))
- .thenReturn(resources);
+ when(utils.localizeTempFiles(eq(path.toString()), eq(conf), eq(inputOutputJars),
+ Mockito.<String[]>any())).thenReturn(resources);
when(utils.getBaseName(res)).thenReturn("foo.jar");
- assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars));
+ assertEquals(resMap, task.getExtraLocalResources(conf, path, inputOutputJars, null));
}
@Test
[3/3] hive git commit: HIVE-16724 : increase session timeout for LLAP
ZK token manager (Sergey Shelukhin, reviewed by Jason Dere)
Posted by se...@apache.org.
HIVE-16724 : increase session timeout for LLAP ZK token manager (Sergey Shelukhin, reviewed by Jason Dere)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/bb2f25c1
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/bb2f25c1
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/bb2f25c1
Branch: refs/heads/master
Commit: bb2f25c1a189b031a9601cb00a3dc2f5d6f5ac4a
Parents: 8d951fa
Author: sergey <se...@apache.org>
Authored: Fri May 19 17:34:24 2017 -0700
Committer: sergey <se...@apache.org>
Committed: Fri May 19 17:34:24 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 ++
.../hive/llap/security/SecretManager.java | 20 ++++++++++++++------
2 files changed, 16 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/bb2f25c1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1c37b6e..7dedd23 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3074,6 +3074,8 @@ public class HiveConf extends Configuration {
"By default, the clients are required to provide tokens to access HDFS/etc."),
LLAP_ZKSM_ZK_CONNECTION_STRING("hive.llap.zk.sm.connectionString", "",
"ZooKeeper connection string for ZooKeeper SecretManager."),
+ LLAP_ZKSM_ZK_SESSION_TIMEOUT("hive.llap.zk.sm.session.timeout", "40s", new TimeValidator(
+ TimeUnit.MILLISECONDS), "ZooKeeper session timeout for ZK SecretManager."),
LLAP_ZK_REGISTRY_USER("hive.llap.zk.registry.user", "",
"In the LLAP ZooKeeper-based registry, specifies the username in the Zookeeper path.\n" +
"This should be the hive user or whichever user is running the LLAP daemon."),
http://git-wip-us.apache.org/repos/asf/hive/blob/bb2f25c1/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
index 08f8b32..8e4f233 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/security/SecretManager.java
@@ -175,18 +175,26 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
zkConf.setLong(DelegationTokenManager.MAX_LIFETIME, tokenLifetime);
zkConf.setLong(DelegationTokenManager.RENEW_INTERVAL, tokenLifetime);
try {
- zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_PRINCIPAL,
+ zkConf.set(ZK_DTSM_ZK_KERBEROS_PRINCIPAL,
SecurityUtil.getServerPrincipal(principal, "0.0.0.0"));
} catch (IOException e) {
throw new RuntimeException(e);
}
- zkConf.set(SecretManager.ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab);
+ zkConf.set(ZK_DTSM_ZK_KERBEROS_KEYTAB, keyTab);
String zkPath = "zkdtsm_" + clusterId;
LOG.info("Using {} as ZK secret manager path", zkPath);
- zkConf.set(SecretManager.ZK_DTSM_ZNODE_WORKING_PATH, zkPath);
+ zkConf.set(ZK_DTSM_ZNODE_WORKING_PATH, zkPath);
// Hardcode SASL here. ZKDTSM only supports none or sasl and we never want none.
- zkConf.set(SecretManager.ZK_DTSM_ZK_AUTH_TYPE, "sasl");
- setZkConfIfNotSet(zkConf, SecretManager.ZK_DTSM_ZK_CONNECTION_STRING,
+ zkConf.set(ZK_DTSM_ZK_AUTH_TYPE, "sasl");
+ long sessionTimeoutMs = HiveConf.getTimeVar(
+ zkConf, ConfVars.LLAP_ZKSM_ZK_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+ long newRetryCount =
+ (ZK_DTSM_ZK_NUM_RETRIES_DEFAULT * sessionTimeoutMs) / ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT;
+ long connTimeoutMs = Math.max(sessionTimeoutMs, ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT);
+ zkConf.set(ZK_DTSM_ZK_SESSION_TIMEOUT, Long.toString(sessionTimeoutMs));
+ zkConf.set(ZK_DTSM_ZK_CONNECTION_TIMEOUT, Long.toString(connTimeoutMs));
+ zkConf.set(ZK_DTSM_ZK_NUM_RETRIES, Long.toString(newRetryCount));
+ setZkConfIfNotSet(zkConf, ZK_DTSM_ZK_CONNECTION_STRING,
HiveConf.getVar(zkConf, ConfVars.LLAP_ZKSM_ZK_CONNECTION_STRING));
UserGroupInformation zkUgi = null;
@@ -201,7 +209,7 @@ public class SecretManager extends ZKDelegationTokenSecretManager<LlapTokenIdent
public static SecretManager createSecretManager(Configuration conf, String clusterId) {
String llapPrincipal = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_PRINCIPAL),
llapKeytab = HiveConf.getVar(conf, ConfVars.LLAP_KERBEROS_KEYTAB_FILE);
- return SecretManager.createSecretManager(conf, llapPrincipal, llapKeytab, clusterId);
+ return createSecretManager(conf, llapPrincipal, llapKeytab, clusterId);
}
public static SecretManager createSecretManager(