You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/07/18 20:49:20 UTC

hive git commit: HIVE-17091: "Timed out getting readerEvents" error from external LLAP client (Jason Dere, reviewed by Siddharth Seth)

Repository: hive
Updated Branches:
  refs/heads/master 358585ba6 -> d4c496bf5


HIVE-17091: "Timed out getting readerEvents" error from external LLAP client (Jason Dere, reviewed by Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d4c496bf
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d4c496bf
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d4c496bf

Branch: refs/heads/master
Commit: d4c496bf5e8c691bbc130e11a854321fb6a25e3e
Parents: 358585b
Author: Jason Dere <jd...@hortonworks.com>
Authored: Tue Jul 18 13:47:52 2017 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Tue Jul 18 13:47:52 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/LlapBaseRecordReader.java  | 14 ++++----
 .../ext/LlapTaskUmbilicalExternalClient.java    | 12 +++++++
 .../hadoop/hive/llap/io/ChunkedInputStream.java | 16 +++++----
 .../hive/llap/io/ChunkedOutputStream.java       |  9 +++--
 .../hive/llap/io/TestChunkedInputStream.java    |  6 ++--
 .../hadoop/hive/llap/ChannelOutputStream.java   | 37 +++++++-------------
 .../hive/llap/LlapOutputFormatService.java      |  4 +--
 .../hadoop/hive/llap/LlapRecordWriter.java      |  6 ++--
 8 files changed, 56 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
index cb38839..af3db8c 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.llap;
 
+import com.google.common.base.Preconditions;
+
 import java.io.BufferedInputStream;
 import java.io.Closeable;
 import java.io.EOFException;
@@ -55,21 +57,19 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
 
   protected Thread readerThread = null;
   protected final LinkedBlockingQueue<ReaderEvent> readerEvents = new LinkedBlockingQueue<ReaderEvent>();
-  protected final long timeout;
   protected final Closeable client;
   private final Closeable socket;
   private boolean closed = false;
 
   public LlapBaseRecordReader(InputStream in, Schema schema,
       Class<V> clazz, JobConf job, Closeable client, Closeable socket) {
-    this.cin = new ChunkedInputStream(in);  // Save so we can verify end of stream
+    String clientId = (client == null ? "" : client.toString());
+    this.cin = new ChunkedInputStream(in, clientId);  // Save so we can verify end of stream
     // We need mark support - wrap with BufferedInputStream.
     din = new DataInputStream(new BufferedInputStream(cin));
     this.schema = schema;
     this.clazz = clazz;
     this.readerThread = Thread.currentThread();
-    this.timeout = 3 * HiveConf.getTimeVar(job,
-        HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
     this.client = client;
     this.socket = socket;
   }
@@ -273,10 +273,8 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
 
   protected ReaderEvent getReaderEvent() throws IOException {
     try {
-      ReaderEvent event = readerEvents.poll(timeout, TimeUnit.MILLISECONDS);
-      if (event == null) {
-        throw new IOException("Timed out getting readerEvents");
-      }
+      ReaderEvent event = readerEvents.take();
+      Preconditions.checkNotNull(event);
       return event;
     } catch (InterruptedException ie) {
       throw new RuntimeException("Interrupted while getting readerEvents, not expected: " + ie.getMessage(), ie);

http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
index 4304b52..ee7d0d3 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/ext/LlapTaskUmbilicalExternalClient.java
@@ -320,6 +320,18 @@ public class LlapTaskUmbilicalExternalClient implements Closeable {
     this.requestInfo.lastHeartbeat.set(lastHeartbeat);
   }
 
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("LlapTaskUmbilicalExternalClient");
+    if (requestInfo != null) {
+      sb.append("(");
+      sb.append(requestInfo.taskAttemptId);
+      sb.append(")");
+    }
+    return sb.toString();
+  }
+
   // Periodic task to time out submitted tasks that have not been updated with umbilical heartbeat.
   private static class HeartbeatCheckTask implements Runnable {
     LlapTaskUmbilicalExternalImpl umbilicalImpl;

http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java
index 4e1c65f..1c97403 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java
@@ -35,13 +35,17 @@ public class ChunkedInputStream extends InputStream {
   private int unreadBytes = 0;  // Bytes remaining in the current chunk of data
   private byte[] singleByte = new byte[1];
   private boolean endOfData = false;
+  private String id;
 
-  public ChunkedInputStream(InputStream in) {
+  public ChunkedInputStream(InputStream in, String id) {
     din = new DataInputStream(in);
+    this.id = id;
+    LOG.debug("Creating chunked input for {}", id);
   }
 
   @Override
   public void close() throws IOException {
+    LOG.debug("{}: Closing chunked input.", id);
     din.close();
   }
 
@@ -56,7 +60,7 @@ public class ChunkedInputStream extends InputStream {
     int bytesRead = 0;
 
     if (len < 0) {
-      throw new IllegalArgumentException("Negative read length");
+      throw new IllegalArgumentException(id + ": Negative read length");
     } else if (len == 0) {
       return 0;
     }
@@ -67,15 +71,15 @@ public class ChunkedInputStream extends InputStream {
         // Find the next chunk size
         unreadBytes = din.readInt();
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Chunk size " + unreadBytes);
+          LOG.debug("{}: Chunk size {}", id, unreadBytes);
         }
         if (unreadBytes == 0) {
-          LOG.debug("Hit end of data");
+          LOG.debug("{}: Hit end of data", id);
           endOfData = true;
           return -1;
         }
       } catch (IOException err) {
-        throw new IOException("Error while attempting to read chunk length", err);
+        throw new IOException(id + ": Error while attempting to read chunk length", err);
       }
     }
 
@@ -83,7 +87,7 @@ public class ChunkedInputStream extends InputStream {
     try {
       din.readFully(b, off, bytesToRead);
     } catch (IOException err) {
-      throw new IOException("Error while attempting to read " + bytesToRead + " bytes from current chunk", err);
+      throw new IOException(id + ": Error while attempting to read " + bytesToRead + " bytes from current chunk", err);
     }
     unreadBytes -= bytesToRead;
     bytesRead += bytesToRead;

http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java
index 31815d5..3a82915 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java
@@ -35,13 +35,16 @@ public class ChunkedOutputStream extends OutputStream {
   private byte[] singleByte = new byte[1];
   private byte[] buffer;
   private int bufPos = 0;
+  private String id;
 
-  public ChunkedOutputStream(OutputStream out, int bufSize) {
+  public ChunkedOutputStream(OutputStream out, int bufSize, String id) {
+    LOG.debug("Creating chunked input stream: {}", id);
     if (bufSize <= 0) {
       throw new IllegalArgumentException("Positive bufSize required, was " + bufSize);
     }
     buffer = new byte[bufSize];
     dout = new DataOutputStream(out);
+    this.id = id;
   }
 
   @Override
@@ -74,7 +77,7 @@ public class ChunkedOutputStream extends OutputStream {
     // Write final 0-length chunk
     writeChunk();
 
-    LOG.debug("ChunkedOutputStream: Closing underlying output stream.");
+    LOG.debug("{}: Closing underlying output stream.", id);
     dout.close();
   }
 
@@ -89,7 +92,7 @@ public class ChunkedOutputStream extends OutputStream {
 
   private void writeChunk() throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Writing chunk of size " + bufPos);
+      LOG.debug("{}: Writing chunk of size {}", id, bufPos);
     }
 
     // First write chunk length

http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java
----------------------------------------------------------------------
diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java b/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java
index 47209c2..ebbf3a4 100644
--- a/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java
+++ b/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java
@@ -150,8 +150,8 @@ public class TestChunkedInputStream {
       pout = new PipedOutputStream();
       pin = new PipedInputStream(pout);
       if (useChunkedStream) {
-        out = new ChunkedOutputStream(pout, bufferSize);
-        in = new ChunkedInputStream(pin);
+        out = new ChunkedOutputStream(pout, bufferSize, "test");
+        in = new ChunkedInputStream(pin, "test");
       } else {
         // Test behavior with non-chunked streams
         out = new FilterOutputStream(pout);
@@ -209,7 +209,7 @@ public class TestChunkedInputStream {
     chunkedStreams.values = values;
     BasicUsageWriter writer2 = new BasicUsageWriter(chunkedStreams, false, false);
     BasicUsageReader reader2 = new BasicUsageReader(chunkedStreams);
-    runTest(writer2, reader2, nonChunkedStreams);
+    runTest(writer2, reader2, chunkedStreams);
     assertTrue(reader2.allValuesRead);
     assertTrue(((ChunkedInputStream) chunkedStreams.in).isEndOfData());
     assertNull(writer2.getError());

http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
index dbe90d6..fe09f58 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.concurrent.Semaphore;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,25 +41,19 @@ public class ChannelOutputStream extends OutputStream {
   private ByteBuf buf;
   private byte[] singleByte = new byte[1];
   private boolean closed = false;
-  private final Object writeMonitor = new Object();
   private final int maxPendingWrites;
-  private volatile int pendingWrites = 0;
+  private final Semaphore writeResources;
 
   private ChannelFutureListener writeListener = new ChannelFutureListener() {
     @Override
     public void operationComplete(ChannelFuture future) {
-
-      pendingWrites--;
+      writeResources.release();
 
       if (future.isCancelled()) {
         LOG.error("Write cancelled on ID " + id);
       } else if (!future.isSuccess()) {
         LOG.error("Write error on ID " + id, future.cause());
       }
-
-      synchronized (writeMonitor) {
-        writeMonitor.notifyAll();
-      }
     }
   };
 
@@ -79,6 +74,7 @@ public class ChannelOutputStream extends OutputStream {
     this.bufSize = bufSize;
     this.buf = chc.alloc().buffer(bufSize);
     this.maxPendingWrites = maxOutstandingWrites;
+    this.writeResources = new Semaphore(maxPendingWrites);
   }
 
   @Override
@@ -126,13 +122,13 @@ public class ChannelOutputStream extends OutputStream {
     try {
       flush();
     } catch (IOException err) {
-      LOG.error("Error flushing stream before close", err);
+      LOG.error("Error flushing stream before close on " + id, err);
     }
 
     closed = true;
 
     // Wait for all writes to finish before we actually close.
-    waitForWritesToFinish(0);
+    takeWriteResources(maxPendingWrites);
 
     try {
       chc.close().addListener(closeListener);
@@ -144,16 +140,12 @@ public class ChannelOutputStream extends OutputStream {
     }
   }
 
-  private void waitForWritesToFinish(int desiredWriteCount) throws IOException {
-    synchronized (writeMonitor) {
-      // to prevent spurious wake up
-      while (pendingWrites > desiredWriteCount) {
-        try {
-          writeMonitor.wait();
-        } catch (InterruptedException ie) {
-          throw new IOException("Interrupted while waiting for write operations to finish for " + id);
-        }
-      }
+  // Attempt to acquire write resources, waiting if they are not available.
+  private void takeWriteResources(int numResources) throws IOException {
+    try {
+      writeResources.acquire(numResources);
+    } catch (InterruptedException ie) {
+      throw new IOException("Interrupted while waiting for write resources for " + id);
     }
   }
 
@@ -162,10 +154,7 @@ public class ChannelOutputStream extends OutputStream {
       throw new IOException("Already closed: " + id);
     }
 
-    // Wait if we have exceeded our max pending write count
-    waitForWritesToFinish(maxPendingWrites - 1);
-
-    pendingWrites++;
+    takeWriteResources(1);
     chc.writeAndFlush(buf.copy()).addListener(writeListener);
     buf.clear();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index c732ba1..586006b 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -198,10 +198,10 @@ public class LlapOutputFormatService {
       int maxPendingWrites = HiveConf.getIntVar(conf,
           HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES);
       @SuppressWarnings("rawtypes")
-      LlapRecordWriter writer = new LlapRecordWriter(
+      LlapRecordWriter writer = new LlapRecordWriter(id,
           new ChunkedOutputStream(
               new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites),
-              sendBufferSize));
+              sendBufferSize, id));
       boolean isFailed = true;
       synchronized (lock) {
         if (!writers.containsKey(id)) {

http://git-wip-us.apache.org/repos/asf/hive/blob/d4c496bf/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
index b632fae..02588cb 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapRecordWriter.java
@@ -33,15 +33,17 @@ public class LlapRecordWriter<K extends Writable, V extends WritableComparable>
   implements RecordWriter<K,V> {
   public static final Logger LOG = LoggerFactory.getLogger(LlapRecordWriter.class);
 
+  String id;
   DataOutputStream dos;
 
-  public LlapRecordWriter(OutputStream out) {
+  public LlapRecordWriter(String id, OutputStream out) {
+    this.id = id;
     dos = new DataOutputStream(out);
   }
 
   @Override
   public void close(Reporter reporter) throws IOException {
-    LOG.info("CLOSING the record writer output stream");
+    LOG.info("CLOSING the record writer output stream for " + id);
     dos.close();
   }