You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/07/18 20:49:20 UTC
hive git commit: HIVE-17091: "Timed out getting readerEvents" error
from external LLAP client (Jason Dere, reviewed by Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/master 358585ba6 -> d4c496bf5
HIVE-17091: "Timed out getting readerEvents" error from external LLAP client (Jason Dere, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d4c496bf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d4c496bf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d4c496bf
Branch: refs/heads/master
Commit: d4c496bf5e8c691bbc130e11a854321fb6a25e3e
Parents: 358585b
Author: Jason Dere <jd...@hortonworks.com>
Authored: Tue Jul 18 13:47:52 2017 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Tue Jul 18 13:47:52 2017 -0700
----------------------------------------------------------------------
.../hadoop/hive/llap/LlapBaseRecordReader.java | 14 ++++----
.../ext/LlapTaskUmbilicalExternalClient.java | 12 +++++++
.../hadoop/hive/llap/io/ChunkedInputStream.java | 16 +++++----
.../hive/llap/io/ChunkedOutputStream.java | 9 +++--
.../hive/llap/io/TestChunkedInputStream.java | 6 ++--
.../hadoop/hive/llap/ChannelOutputStream.java | 37 +++++++-------------
.../hive/llap/LlapOutputFormatService.java | 4 +--
.../hadoop/hive/llap/LlapRecordWriter.java | 6 ++--
8 files changed, 56 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
index cb38839..af3db8c 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.llap;
+import com.google.common.base.Preconditions;
+
import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.EOFException;
@@ -55,21 +57,19 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
protected Thread readerThread = null;
protected final LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>();
- protected final long timeout;
protected final Closeable client;
private final Closeable socket;
private boolean closed = false;
public LlapBaseRecordReader(InputStream in, Schema schema,
Class<V> clazz, JobConf job, Closeable client, Closeable socket) {
- this.cin = new ChunkedInputStream(in); // Save so we can verify end of stream
+ String clientId = (client == null ? "" : client.toString());
+ this.cin = new ChunkedInputStream(in, clientId); // Save so we can verify end of stream
// We need mark support - wrap with BufferedInputStream.
din = new DataInputStream(new BufferedInputStream(cin));
this.schema = schema;
this.clazz = clazz;
this.readerThread = Thread.currentThread();
- this.timeout = 3 * HiveConf.getTimeVar(job,
- HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
this.client = client;
this.socket = socket;
}
@@ -273,10 +273,8 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
protected ReaderEvent getReaderEvent() throws IOException {
try {
- ReaderEvent event = readerEvents.poll(timeout, TimeUnit.MILLISECONDS);
- if (event == null) {
- throw new IOException("Timed out getting readerEvents");
- }
+ ReaderEvent event = readerEvents.take();
+ Preconditions.checkNotNull(event);
return event;
} catch (InterruptedException ie) {
throw new RuntimeException("Interrupted while getting readerEvents, not expected: " + ie.getMessage(), ie);
http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 4304b52..ee7d0d3 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -320,6 +320,18 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
this.requestInfo.lastHeartbeat.set(lastHeartbeat);
}
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("LlapTaskUmbilicalExternalClient");
+ if (requestInfo != null) {
+ sb.append("(");
+ sb.append(requestInfo.taskAttemptId);
+ sb.append(")");
+ }
+ return sb.toString();
+ }
+
// Periodic task to time out submitted tasks that have not been updated with umbilical heartbeat.
private static class HeartbeatCheckTask implements Runnable {
LlapTaskUmbilicalExternalImpl umbilicalImpl;
http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java
index 4e1c65f..1c97403 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java
@@ -35,13 +35,17 @@ public class ChunkedInputStream extends InputStream {
private int unreadBytes = 0; // Bytes remaining in the current chunk of data
private byte[] singleByte = new byte[1];
private boolean endOfData = false;
+ private String id;
- public ChunkedInputStream(InputStream in) {
+ public ChunkedInputStream(InputStream in, String id) {
din = new DataInputStream(in);
+ this.id = id;
+ LOG.debug("Creating chunked input for {}", id);
}
@Override
public void close() throws IOException {
+ LOG.debug("{}: Closing chunked input.", id);
din.close();
}
@@ -56,7 +60,7 @@ public class ChunkedInputStream extends InputStream {
int bytesRead = 0;
if (len < 0) {
- throw new IllegalArgumentException("Negative read length");
+ throw new IllegalArgumentException(id + ": Negative read length");
} else if (len == 0) {
return 0;
}
@@ -67,15 +71,15 @@ public class ChunkedInputStream extends InputStream {
// Find the next chunk size
unreadBytes = din.readInt();
if (LOG.isDebugEnabled()) {
- LOG.debug("Chunk size " + unreadBytes);
+ LOG.debug("{}: Chunk size {}", id, unreadBytes);
}
if (unreadBytes == 0) {
- LOG.debug("Hit end of data");
+ LOG.debug("{}: Hit end of data", id);
endOfData = true;
return -1;
}
} catch (IOException err) {
- throw new IOException("Error while attempting to read chunk length", err);
+ throw new IOException(id + ": Error while attempting to read chunk length", err);
}
}
@@ -83,7 +87,7 @@ public class ChunkedInputStream extends InputStream {
try {
din.readFully(b, off, bytesToRead);
} catch (IOException err) {
- throw new IOException("Error while attempting to read " + bytesToRead + " bytes from current chunk", err);
+ throw new IOException(id + ": Error while attempting to read " + bytesToRead + " bytes from current chunk", err);
}
unreadBytes -= bytesToRead;
bytesRead += bytesToRead;
http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java
index 31815d5..3a82915 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java
@@ -35,13 +35,16 @@ public class ChunkedOutputStream extends OutputStream {
private byte[] singleByte = new byte[1];
private byte[] buffer;
private int bufPos = 0;
+ private String id;
- public ChunkedOutputStream(OutputStream out, int bufSize) {
+ public ChunkedOutputStream(OutputStream out, int bufSize, String id) {
+ LOG.debug("Creating chunked input stream: {}", id);
if (bufSize <= 0) {
throw new IllegalArgumentException("Positive bufSize required, was " + bufSize);
}
buffer = new byte[bufSize];
dout = new DataOutputStream(out);
+ this.id = id;
}
@Override
@@ -74,7 +77,7 @@ public class ChunkedOutputStream extends OutputStream {
// Write final 0-length chunk
writeChunk();
- LOG.debug("ChunkedOutputStream: Closing underlying output stream.");
+ LOG.debug("{}: Closing underlying output stream.", id);
dout.close();
}
@@ -89,7 +92,7 @@ public class ChunkedOutputStream extends OutputStream {
private void writeChunk() throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("Writing chunk of size " + bufPos);
+ LOG.debug("{}: Writing chunk of size {}", id, bufPos);
}
// First write chunk length
http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java
----------------------------------------------------------------------
diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java b/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java
index 47209c2..ebbf3a4 100644
--- a/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java
+++ b/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java
@@ -150,8 +150,8 @@ public class TestChunkedInputStream {
pout = new PipedOutputStream();
pin = new PipedInputStream(pout);
if (useChunkedStream) {
- out = new ChunkedOutputStream(pout, bufferSize);
- in = new ChunkedInputStream(pin);
+ out = new ChunkedOutputStream(pout, bufferSize, "test");
+ in = new ChunkedInputStream(pin, "test");
} else {
// Test behavior with non-chunked streams
out = new FilterOutputStream(pout);
@@ -209,7 +209,7 @@ public class TestChunkedInputStream {
chunkedStreams.values = values;
BasicUsageWriter writer2 = new BasicUsageWriter(chunkedStreams, false, false);
BasicUsageReader reader2 = new BasicUsageReader(chunkedStreams);
- runTest(writer2, reader2, nonChunkedStreams);
+ runTest(writer2, reader2, chunkedStreams);
assertTrue(reader2.allValuesRead);
assertTrue(((ChunkedInputStream) chunkedStreams.in).isEndOfData());
assertNull(writer2.getError());
http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
index dbe90d6..fe09f58 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,25 +41,19 @@ public class ChannelOutputStream extends OutputStream {
private ByteBuf buf;
private byte[] singleByte = new byte[1];
private boolean closed = false;
- private final Object writeMonitor = new Object();
private final int maxPendingWrites;
- private volatile int pendingWrites = 0;
+ private final Semaphore writeResources;
private ChannelFutureListener writeListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
-
- pendingWrites--;
+ writeResources.release();
if (future.isCancelled()) {
LOG.error("Write cancelled on ID " + id);
} else if (!future.isSuccess()) {
LOG.error("Write error on ID " + id, future.cause());
}
-
- synchronized (writeMonitor) {
- writeMonitor.notifyAll();
- }
}
};
@@ -79,6 +74,7 @@ public class ChannelOutputStream extends OutputStream {
this.bufSize = bufSize;
this.buf = chc.alloc().buffer(bufSize);
this.maxPendingWrites = maxOutstandingWrites;
+ this.writeResources = new Semaphore(maxPendingWrites);
}
@Override
@@ -126,13 +122,13 @@ public class ChannelOutputStream extends OutputStream {
try {
flush();
} catch (IOException err) {
- LOG.error("Error flushing stream before close", err);
+ LOG.error("Error flushing stream before close on " + id, err);
}
closed = true;
// Wait for all writes to finish before we actually close.
- waitForWritesToFinish(0);
+ takeWriteResources(maxPendingWrites);
try {
chc.close().addListener(closeListener);
@@ -144,16 +140,12 @@ public class ChannelOutputStream extends OutputStream {
}
}
- private void waitForWritesToFinish(int desiredWriteCount) throws IOException {
- synchronized (writeMonitor) {
- // to prevent spurious wake up
- while (pendingWrites > desiredWriteCount) {
- try {
- writeMonitor.wait();
- } catch (InterruptedException ie) {
- throw new IOException("Interrupted while waiting for write operations to finish for " + id);
- }
- }
+ // Attempt to acquire write resources, waiting if they are not available.
+ private void takeWriteResources(int numResources) throws IOException {
+ try {
+ writeResources.acquire(numResources);
+ } catch (InterruptedException ie) {
+ throw new IOException("Interrupted while waiting for write resources for " + id);
}
}
@@ -162,10 +154,7 @@ public class ChannelOutputStream extends OutputStream {
throw new IOException("Already closed: " + id);
}
- // Wait if we have exceeded our max pending write count
- waitForWritesToFinish(maxPendingWrites - 1);
-
- pendingWrites++;
+ takeWriteResources(1);
chc.writeAndFlush(buf.copy()).addListener(writeListener);
buf.clear();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index c732ba1..586006b 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -198,10 +198,10 @@ public class LlapOutputFormatService {
int maxPendingWrites = HiveConf.getIntVar(conf,
HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES);
@SuppressWarnings("rawtypes")
- LlapRecordWriter writer = new LlapRecordWriter(
+ LlapRecordWriter writer = new LlapRecordWriter(id,
new ChunkedOutputStream(
new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites),
- sendBufferSize));
+ sendBufferSize, id));
boolean isFailed = true;
synchronized (lock) {
if (!writers.containsKey(id)) {
http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
index b632fae..02588cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
@@ -33,15 +33,17 @@ public class LlapRecordWriter<K extends Writable, V extends WritableComparable>
implements RecordWriter<K,V> {
public static final Logger LOG = LoggerFactory.getLogger(LlapRecordWriter.class);
+ String id;
DataOutputStream dos;
- public LlapRecordWriter(OutputStream out) {
+ public LlapRecordWriter(String id, OutputStream out) {
+ this.id = id;
dos = new DataOutputStream(out);
}
@Override
public void close(Reporter reporter) throws IOException {
- LOG.info("CLOSING the record writer output stream");
+ LOG.info("CLOSING the record writer output stream for " + id);
dos.close();
}