You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/05/11 17:32:14 UTC
hive git commit: HIVE-16637: Improve end-of-data checking for LLAP
input format (Jason Dere, reviewed by Sergey Shelukhin)
Repository: hive
Updated Branches:
refs/heads/master 8e0087942 -> 162f59242
HIVE-16637: Improve end-of-data checking for LLAP input format (Jason Dere, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/162f5924
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/162f5924
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/162f5924
Branch: refs/heads/master
Commit: 162f592425ce2242ed9be7cd805aa1d5e37b3c11
Parents: 8e00879
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu May 11 10:31:21 2017 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu May 11 10:31:21 2017 -0700
----------------------------------------------------------------------
.../hadoop/hive/llap/LlapBaseRecordReader.java | 48 +++-
.../hadoop/hive/llap/io/ChunkedInputStream.java | 97 +++++++
.../hive/llap/io/ChunkedOutputStream.java | 103 ++++++++
.../hive/llap/io/TestChunkedInputStream.java | 254 +++++++++++++++++++
.../hive/llap/LlapOutputFormatService.java | 5 +-
5 files changed, 493 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/162f5924/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
index 59dec1b..7fff147 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.llap;
+import java.io.BufferedInputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
@@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.hive.llap.io.ChunkedInputStream;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.NullWritable;
@@ -46,6 +48,7 @@ import org.slf4j.LoggerFactory;
public class LlapBaseRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> {
private static final Logger LOG = LoggerFactory.getLogger(LlapBaseRecordReader.class);
+ protected final ChunkedInputStream cin;
protected final DataInputStream din;
protected final Schema schema;
protected final Class<V> clazz;
@@ -58,7 +61,9 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
public LlapBaseRecordReader(InputStream in, Schema schema,
Class<V> clazz, JobConf job, Closeable client, Closeable socket) {
- din = new DataInputStream(in);
+ this.cin = new ChunkedInputStream(in); // Save so we can verify end of stream
+ // We need mark support - wrap with BufferedInputStream.
+ din = new DataInputStream(new BufferedInputStream(cin));
this.schema = schema;
this.clazz = clazz;
this.readerThread = Thread.currentThread();
@@ -129,19 +134,27 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
// Need a way to know what thread to interrupt, since this is a blocking thread.
setReaderThread(Thread.currentThread());
- value.readFields(din);
- return true;
- } catch (EOFException eof) {
- // End of input. There should be a reader event available, or coming soon, so okay to be blocking call.
- ReaderEvent event = getReaderEvent();
- switch (event.getEventType()) {
- case DONE:
- break;
- default:
- throw new IOException("Expected reader event with done status, but got "
- + event.getEventType() + " with message " + event.getMessage());
+ if (hasInput()) {
+ value.readFields(din);
+ return true;
+ } else {
+ // End of input. Confirm we got end of stream indicator from server,
+ // as well as DONE status from fragment execution.
+ if (!cin.isEndOfData()) {
+ throw new IOException("Hit end of input, but did not find expected end of data indicator");
+ }
+
+ // There should be a reader event available, or coming soon, so okay to be blocking call.
+ ReaderEvent event = getReaderEvent();
+ switch (event.getEventType()) {
+ case DONE:
+ break;
+ default:
+ throw new IOException("Expected reader event with done status, but got "
+ + event.getEventType() + " with message " + event.getMessage());
+ }
+ return false;
}
- return false;
} catch (IOException io) {
if (Thread.interrupted()) {
// Either we were interrupted by one of:
@@ -232,6 +245,15 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
}
}
+ protected boolean hasInput() throws IOException {
+ din.mark(1);
+ if (din.read() >= 0) {
+ din.reset();
+ return true;
+ }
+ return false;
+ }
+
protected ReaderEvent getReaderEvent() throws IOException {
try {
ReaderEvent event = readerEvents.poll(timeout, TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/hive/blob/162f5924/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java
new file mode 100644
index 0000000..4e1c65f
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Data is expected to be a series of data chunks in the form <chunk size><chunk bytes><chunk size><chunk bytes>
+// The final data chunk should be a 0-length chunk which will indicate end of input.
+public class ChunkedInputStream extends InputStream {
+
+ static final private Logger LOG = LoggerFactory.getLogger(ChunkedInputStream.class);
+
+ private DataInputStream din;
+ private int unreadBytes = 0; // Bytes remaining in the current chunk of data
+ private byte[] singleByte = new byte[1];
+ private boolean endOfData = false;
+
+ public ChunkedInputStream(InputStream in) {
+ din = new DataInputStream(in);
+ }
+
+ @Override
+ public void close() throws IOException {
+ din.close();
+ }
+
+ @Override
+ public int read() throws IOException {
+ int bytesRead = read(singleByte, 0, 1);
+ return (bytesRead == -1) ? -1 : (int) singleByte[0];
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int bytesRead = 0;
+
+ if (len < 0) {
+ throw new IllegalArgumentException("Negative read length");
+ } else if (len == 0) {
+ return 0;
+ }
+
+ // If there is a current unread chunk, read from that, or else get the next chunk.
+ if (unreadBytes == 0) {
+ try {
+ // Find the next chunk size
+ unreadBytes = din.readInt();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Chunk size " + unreadBytes);
+ }
+ if (unreadBytes == 0) {
+ LOG.debug("Hit end of data");
+ endOfData = true;
+ return -1;
+ }
+ } catch (IOException err) {
+ throw new IOException("Error while attempting to read chunk length", err);
+ }
+ }
+
+ int bytesToRead = Math.min(len, unreadBytes);
+ try {
+ din.readFully(b, off, bytesToRead);
+ } catch (IOException err) {
+ throw new IOException("Error while attempting to read " + bytesToRead + " bytes from current chunk", err);
+ }
+ unreadBytes -= bytesToRead;
+ bytesRead += bytesToRead;
+
+ return bytesRead;
+ }
+
+ public boolean isEndOfData() {
+ return endOfData;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/162f5924/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java
new file mode 100644
index 0000000..31815d5
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Writes data out as a series of chunks in the form <chunk size><chunk bytes><chunk size><chunk bytes>
+// Closing the output stream will send a final 0-length chunk which will indicate end of input.
+public class ChunkedOutputStream extends OutputStream {
+
+ static final private Logger LOG = LoggerFactory.getLogger(ChunkedOutputStream.class);
+
+ private DataOutputStream dout;
+ private byte[] singleByte = new byte[1];
+ private byte[] buffer;
+ private int bufPos = 0;
+
+ public ChunkedOutputStream(OutputStream out, int bufSize) {
+ if (bufSize <= 0) {
+ throw new IllegalArgumentException("Positive bufSize required, was " + bufSize);
+ }
+ buffer = new byte[bufSize];
+ dout = new DataOutputStream(out);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ singleByte[0] = (byte) b;
+ write(singleByte, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ int bytesWritten = 0;
+ while (bytesWritten < len) {
+ // Copy the data to the buffer
+ int bytesToWrite = Math.min(len - bytesWritten, buffer.length - bufPos);
+ System.arraycopy(b, off + bytesWritten, buffer, bufPos, bytesToWrite);
+ bytesWritten += bytesToWrite;
+ bufPos += bytesToWrite;
+
+ // If we've filled the buffer, write it out
+ if (bufPos == buffer.length) {
+ writeChunk();
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+
+ // Write final 0-length chunk
+ writeChunk();
+
+ LOG.debug("ChunkedOutputStream: Closing underlying output stream.");
+ dout.close();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ // Write any remaining bytes to the out stream.
+ if (bufPos > 0) {
+ writeChunk();
+ dout.flush();
+ }
+ }
+
+ private void writeChunk() throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing chunk of size " + bufPos);
+ }
+
+ // First write chunk length
+ dout.writeInt(bufPos);
+
+ // Then write chunk bytes
+ dout.write(buffer, 0, bufPos);
+
+ bufPos = 0; // reset buffer
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/162f5924/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java
----------------------------------------------------------------------
diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java b/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java
new file mode 100644
index 0000000..47209c2
--- /dev/null
+++ b/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.hive.common.type.RandomTypeUtil;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestChunkedInputStream {
+
+ static int bufferSize = 128;
+ static Random rand = new Random();
+ static String alphabet = "abcdefghijklmnopqrstuvwxyz";
+
+ static class StreamTester {
+ Exception error = null;
+
+ public Exception getError() {
+ return error;
+ }
+
+ public void setError(Exception error) {
+ this.error = error;
+ }
+ }
+
+ // Test class to write a series of values to the designated output stream
+ static class BasicUsageWriter extends StreamTester implements Runnable {
+ TestStreams streams;
+ boolean flushCout;
+ boolean closePoutEarly;
+
+ public BasicUsageWriter(TestStreams streams, boolean flushCout, boolean closePoutEarly) {
+ this.streams = streams;
+ this.flushCout = flushCout;
+ this.closePoutEarly = closePoutEarly;
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Write the items to the output stream.
+ for (byte[] value: streams.values) {
+ streams.out.write(value, 0, value.length);
+ }
+
+ if (flushCout) {
+ streams.out.flush();
+ }
+ if (closePoutEarly) {
+ // Close the inner output stream before closing the outer output stream.
+ // For chunked output this means we don't write end-of-data indicator.
+ streams.pout.close();
+ }
+ // This will throw error if we close pout early.
+ streams.out.close();
+ } catch (Exception err) {
+ err.printStackTrace();
+ this.error = err;
+ }
+ }
+ }
+
+ // Test class to read a series of values to the designated input stream
+ static class BasicUsageReader extends StreamTester implements Runnable {
+ TestStreams streams;
+ boolean allValuesRead = false;
+
+ public BasicUsageReader(TestStreams streams) {
+ this.streams = streams;
+ }
+
+ // Continue reading from the input stream until the desired number of byte has been read
+ void readFully(InputStream in, byte[] readValue, int numBytes) throws IOException {
+ int bytesRead = 0;
+ while (bytesRead < numBytes) {
+ int read = in.read(readValue, bytesRead, numBytes - bytesRead);
+ if (read <= 0) {
+ throw new IOException("Unexpected read length " + read);
+ }
+ bytesRead += read;
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Read the items from the input stream and confirm they match
+ for (byte[] value : streams.values) {
+ byte[] readValue = new byte[value.length];
+ readFully(streams.in, readValue, readValue.length);
+ assertArrayEquals(value, readValue);
+ }
+
+ allValuesRead = true;
+
+ // Check that the output is done
+ assertEquals(-1, streams.in.read());
+ } catch (Exception err) {
+ err.printStackTrace();
+ this.error = err;
+ }
+ }
+ }
+
+ static class MyFilterInputStream extends FilterInputStream {
+ public MyFilterInputStream(InputStream in) {
+ super(in);
+ }
+ }
+
+ // Helper class to set up a ChunkedInput/Output stream for testing
+ static class TestStreams {
+ PipedOutputStream pout;
+ OutputStream out;
+ PipedInputStream pin;
+ InputStream in;
+ List<byte[]> values;
+
+ public TestStreams(boolean useChunkedStream) throws Exception {
+ pout = new PipedOutputStream();
+ pin = new PipedInputStream(pout);
+ if (useChunkedStream) {
+ out = new ChunkedOutputStream(pout, bufferSize);
+ in = new ChunkedInputStream(pin);
+ } else {
+ // Test behavior with non-chunked streams
+ out = new FilterOutputStream(pout);
+ in = new MyFilterInputStream(pin);
+ }
+ }
+
+ public void close() {
+ try {
+ pout.close();
+ } catch (Exception err) {
+ // ignore
+ }
+
+ try {
+ pin.close();
+ } catch (Exception err) {
+ // ignore
+ }
+ }
+ }
+
+ static void runTest(Runnable writer, Runnable reader, TestStreams streams) throws Exception {
+ Thread writerThread = new Thread(writer);
+ Thread readerThread = new Thread(reader);
+
+ writerThread.start();
+ readerThread.start();
+
+ writerThread.join();
+ readerThread.join();
+ }
+
+ @Test
+ public void testBasicUsage() throws Exception {
+ List<byte[]> values = Arrays.asList(
+ new byte[]{(byte) 1},
+ new byte[]{(byte) 2},
+ RandomTypeUtil.getRandString(rand, alphabet, 99).getBytes(),
+ RandomTypeUtil.getRandString(rand, alphabet, 1024).getBytes()
+ );
+
+ // Try the basic test with non-chunked stream
+ TestStreams nonChunkedStreams = new TestStreams(false);
+ nonChunkedStreams.values = values;
+ BasicUsageWriter writer1 = new BasicUsageWriter(nonChunkedStreams, false, false);
+ BasicUsageReader reader1 = new BasicUsageReader(nonChunkedStreams);
+ runTest(writer1, reader1, nonChunkedStreams);
+ assertTrue(reader1.allValuesRead);
+ assertNull(writer1.getError());
+ assertNull(reader1.getError());
+
+ // Try with chunked streams
+ TestStreams chunkedStreams = new TestStreams(true);
+ chunkedStreams.values = values;
+ BasicUsageWriter writer2 = new BasicUsageWriter(chunkedStreams, false, false);
+ BasicUsageReader reader2 = new BasicUsageReader(chunkedStreams);
+ runTest(writer2, reader2, nonChunkedStreams);
+ assertTrue(reader2.allValuesRead);
+ assertTrue(((ChunkedInputStream) chunkedStreams.in).isEndOfData());
+ assertNull(writer2.getError());
+ assertNull(reader2.getError());
+ }
+
+ @Test
+ public void testAbruptlyClosedOutput() throws Exception {
+ List<byte[]> values = Arrays.asList(
+ new byte[]{(byte) 1},
+ new byte[]{(byte) 2},
+ RandomTypeUtil.getRandString(rand, alphabet, 99).getBytes(),
+ RandomTypeUtil.getRandString(rand, alphabet, 1024).getBytes()
+ );
+
+ // Close the PipedOutputStream before we close the outermost OutputStream.
+
+ // Try non-chunked stream. There should be no issues assuming we flushed the streams before closing.
+ TestStreams nonChunkedStreams = new TestStreams(false);
+ nonChunkedStreams.values = values;
+ BasicUsageWriter writer1 = new BasicUsageWriter(nonChunkedStreams, true, true);
+ BasicUsageReader reader1 = new BasicUsageReader(nonChunkedStreams);
+ runTest(writer1, reader1, nonChunkedStreams);
+ assertTrue(reader1.allValuesRead);
+ assertNull(writer1.getError());
+ assertNull(reader1.getError());
+
+ // Try with chunked stream. Here the chunked output didn't get a chance to write the end-of-data
+ // indicator, so the chunked input does not know to stop reading.
+ TestStreams chunkedStreams = new TestStreams(true);
+ chunkedStreams.values = values;
+ BasicUsageWriter writer2 = new BasicUsageWriter(chunkedStreams, true, true);
+ BasicUsageReader reader2 = new BasicUsageReader(chunkedStreams);
+ runTest(writer2, reader2, chunkedStreams);
+ assertTrue(reader2.allValuesRead);
+ assertFalse(((ChunkedInputStream) chunkedStreams.in).isEndOfData());
+ // Closing the chunked output stream early gives an error
+ assertNotNull(writer2.getError());
+ // In this case we should expect the test to have failed at the very last read() check.
+ assertNotNull(reader2.getError());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/162f5924/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index 0619b79..c732ba1 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage;
+import org.apache.hadoop.hive.llap.io.ChunkedOutputStream;
import org.apache.hadoop.hive.llap.security.SecretManager;
import com.google.common.base.Preconditions;
@@ -198,7 +199,9 @@ public class LlapOutputFormatService {
HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES);
@SuppressWarnings("rawtypes")
LlapRecordWriter writer = new LlapRecordWriter(
- new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites));
+ new ChunkedOutputStream(
+ new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites),
+ sendBufferSize));
boolean isFailed = true;
synchronized (lock) {
if (!writers.containsKey(id)) {