You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/03/03 21:58:53 UTC
hive git commit: HIVE-13202: LLAP: Replace use of ServerSocket with
netty in LlapOutputFormatService
Repository: hive
Updated Branches:
refs/heads/llap d8a9531a7 -> 81b26df9e
HIVE-13202: LLAP: Replace use of ServerSocket with netty in LlapOutputFormatService
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/81b26df9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/81b26df9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/81b26df9
Branch: refs/heads/llap
Commit: 81b26df9ed00e9db671c57aece8e51bf62365e34
Parents: d8a9531
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Mar 3 12:57:43 2016 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Mar 3 12:57:43 2016 -0800
----------------------------------------------------------------------
.../apache/hive/jdbc/TestJdbcWithMiniLlap.java | 74 +++++++--
.../hadoop/hive/llap/ChannelOutputStream.java | 141 +++++++++++++++++
.../hive/llap/LlapOutputFormatService.java | 155 ++++++++++++-------
.../hadoop/hive/ql/exec/FileSinkOperator.java | 12 +-
4 files changed, 308 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/81b26df9/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
index 88e2e55..98daab4 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
@@ -37,6 +37,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
@@ -86,6 +88,7 @@ public class TestJdbcWithMiniLlap {
private static Path kvDataFilePath;
private static final String tmpDir = System.getProperty("test.tmp.dir");
+ private static HiveConf conf = null;
private Connection hs2Conn = null;
@BeforeClass
@@ -98,7 +101,7 @@ public class TestJdbcWithMiniLlap {
System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation());
}
- HiveConf conf = new HiveConf();
+ conf = new HiveConf();
conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
// Necessary for GetSplits()/LlapInputFormat,
// the config generated for the query fragment needs to include the MapWork
@@ -109,7 +112,7 @@ public class TestJdbcWithMiniLlap {
conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
+ "/llap-daemon-site.xml"));
- miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP, true);
+ miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
kvDataFilePath = new Path(dataFileDir, "kv1.txt");
@@ -160,21 +163,54 @@ public class TestJdbcWithMiniLlap {
stmt.close();
}
- @Test
- public void testLlapInputFormatEndToEnd() throws Exception {
- createTestTable("testtab1");
+ private static boolean timedOut = false;
+
+ private static class TestTimerTask extends TimerTask {
+ private boolean timedOut = false;
+ private Thread threadToInterrupt;
+
+ public TestTimerTask(Thread threadToInterrupt) {
+ this.threadToInterrupt = threadToInterrupt;
+ }
+
+ @Override
+ public void run() {
+ System.out.println("Test timed out!");
+ timedOut = true;
+ threadToInterrupt.interrupt();
+ }
+
+ public boolean isTimedOut() {
+ return timedOut;
+ }
+
+ public void setTimedOut(boolean timedOut) {
+ this.timedOut = timedOut;
+ }
+
+ }
+
+ private int getLlapIFRowCount(String query, int numSplits) throws Exception {
+ // Add a timer task to stop this test if it has not finished in a reasonable amount of time.
+ Timer timer = new Timer();
+ long delay = 30000;
+ TestTimerTask timerTask = new TestTimerTask(Thread.currentThread());
+ timer.schedule(timerTask, delay);
+
+ // Setup LlapInputFormat
String url = miniHS2.getJdbcURL();
String user = System.getProperty("user.name");
String pwd = user;
- String query = "select * from testtab1 where under_col = 0";
LlapInputFormat inputFormat = new LlapInputFormat(url, user, pwd, query);
- JobConf job = new JobConf();
- int numSplits = 1;
+
+ // Get splits
+ JobConf job = new JobConf(conf);
InputSplit[] splits = inputFormat.getSplits(job, numSplits);
- assert(splits.length > 0);
+ assertTrue(splits.length > 0);
+ // Fetch rows from splits
boolean first = true;
int rowCount = 0;
for (InputSplit split : splits) {
@@ -198,6 +234,26 @@ public class TestJdbcWithMiniLlap {
++rowCount;
}
}
+
+ timer.cancel();
+ assertFalse("Test timed out", timerTask.isTimedOut());
+
+ return rowCount;
+ }
+
+ @Test
+ public void testLlapInputFormatEndToEnd() throws Exception {
+ createTestTable("testtab1");
+
+ int rowCount;
+
+ String query = "select * from testtab1 where under_col = 0";
+ rowCount = getLlapIFRowCount(query, 1);
assertEquals(3, rowCount);
+
+ // Try empty rows query
+ query = "select * from testtab1 where true = false";
+ rowCount = getLlapIFRowCount(query, 1);
+ assertEquals(0, rowCount);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/81b26df9/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
new file mode 100644
index 0000000..e861791
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OutputStream to write to the Netty Channel
+ */
+public class ChannelOutputStream extends OutputStream {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ChannelOutputStream.class);
+
+ private ChannelHandlerContext chc;
+ private int bufSize;
+ private String id;
+ private ByteBuf buf;
+ private byte[] singleByte = new byte[1];
+ private boolean closed = false;
+
+ private ChannelFutureListener listener = new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (future.isCancelled()) {
+ LOG.error(id + " was cancelled");
+ } else if (!future.isSuccess()) {
+ LOG.error("Error on ID " + id, future.cause());
+ }
+ }
+ };
+
+ public ChannelOutputStream(ChannelHandlerContext chc, String id, int bufSize) {
+ this.chc = chc;
+ this.id = id;
+ this.bufSize = bufSize;
+ this.buf = chc.alloc().buffer(bufSize);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ singleByte[0] = (byte) b;
+ write(singleByte, 0, 1);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ int currentOffset = off;
+ int bytesRemaining = len;
+
+ while (bytesRemaining + buf.readableBytes() > bufSize) {
+ int iterationLen = bufSize - buf.readableBytes();
+ writeInternal(b, currentOffset, iterationLen);
+ currentOffset += iterationLen;
+ bytesRemaining -= iterationLen;
+ }
+
+ if (bytesRemaining > 0) {
+ writeInternal(b, currentOffset, bytesRemaining);
+ }
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (buf.isReadable()) {
+ writeToChannel();
+ }
+ chc.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ throw new IOException("Already closed: " + id);
+ }
+
+ try {
+ flush();
+ } catch (IOException err) {
+ LOG.error("Error flushing stream before close", err);
+ }
+
+ try {
+ chc.close().addListener(listener).sync();
+ } catch (InterruptedException err) {
+ throw new IOException(err);
+ } finally {
+ buf.release();
+ buf = null;
+ chc = null;
+ closed = true;
+ }
+ }
+
+ private void writeToChannel() throws IOException {
+ if (closed) {
+ throw new IOException("Already closed: " + id);
+ }
+
+ chc.write(buf.copy()).addListener(listener);
+ buf.clear();
+ }
+
+ private void writeInternal(byte[] b, int off, int len) throws IOException {
+ if (closed) {
+ throw new IOException("Already closed: " + id);
+ }
+
+ buf.writeBytes(b, off, len);
+ if (buf.readableBytes() >= bufSize) {
+ writeToChannel();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/81b26df9/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index a197d7b..b39f085 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -19,9 +19,6 @@ package org.apache.hadoop.hive.llap;
import java.util.Map;
import java.util.HashMap;
import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.io.InputStream;
import java.io.OutputStream;
import org.slf4j.Logger;
@@ -45,8 +42,22 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.handler.codec.Delimiters;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+import io.netty.util.concurrent.Future;
+
/**
*
@@ -57,18 +68,17 @@ public class LlapOutputFormatService {
private static LlapOutputFormatService service;
private final Map<String, RecordWriter> writers;
- private final ServerSocket socket;
private final HiveConf conf;
- private final ExecutorService executor;
private static final int WAIT_TIME = 5;
+ private static final int MAX_QUERY_ID_LENGTH = 256;
+
+ private EventLoopGroup eventLoopGroup;
+ private ServerBootstrap serverBootstrap;
+ private ChannelFuture listeningChannelFuture;
private LlapOutputFormatService() throws IOException {
writers = new HashMap<String, RecordWriter>();
conf = new HiveConf();
- executor = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LLAP output %d").build());
- socket = new ServerSocket(
- conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT));
}
public static LlapOutputFormatService get() throws IOException {
@@ -80,52 +90,34 @@ public class LlapOutputFormatService {
}
public void start() throws IOException {
- executor.submit(new Runnable() {
- byte[] buffer = new byte[4096];
- @Override
- public void run() {
- while (true) {
- Socket s = null;
- try {
- s = socket.accept();
- String id = readId(s);
- LOG.debug("Received: "+id);
- registerReader(s, id);
- } catch (IOException io) {
- if (s != null) {
- try{
- s.close();
- } catch (IOException io2) {
- // ignore
- }
- }
- }
- }
- }
-
- private String readId(Socket s) throws IOException {
- InputStream in = s.getInputStream();
- int idx = 0;
- while((buffer[idx++] = (byte)in.read()) != '\0') {}
- return new String(buffer,0,idx-1);
- }
-
- private void registerReader(Socket s, String id) throws IOException {
- synchronized(service) {
- LOG.debug("registering socket for: "+id);
- LlapRecordWriter writer = new LlapRecordWriter(s.getOutputStream());
- writers.put(id, writer);
- service.notifyAll();
- }
- }
- }
- );
+ LOG.info("Starting LlapOutputFormatService");
+
+ int port = conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
+ eventLoopGroup = new NioEventLoopGroup(1);
+ serverBootstrap = new ServerBootstrap();
+ serverBootstrap.group(eventLoopGroup);
+ serverBootstrap.channel(NioServerSocketChannel.class);
+ serverBootstrap.childHandler(new LlapOutputFormatServiceChannelHandler());
+ try {
+ LOG.info("LlapOutputFormatService: Binding to port " + port);
+ listeningChannelFuture = serverBootstrap.bind(port).sync();
+ } catch (InterruptedException err) {
+ throw new IOException("LlapOutputFormatService: Error binding to port " + port, err);
+ }
}
public void stop() throws IOException, InterruptedException {
- executor.shutdown();
- executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
- socket.close();
+ LOG.info("Stopping LlapOutputFormatService");
+
+ if (listeningChannelFuture != null) {
+ listeningChannelFuture.channel().close().sync();
+ listeningChannelFuture = null;
+ } else {
+ LOG.warn("LlapOutputFormatService does not appear to have a listening port to close.");
+ }
+
+ Future terminationFuture = eventLoopGroup.shutdownGracefully(1, WAIT_TIME, TimeUnit.SECONDS);
+ terminationFuture.sync();
}
public <K,V> RecordWriter<K, V> getWriter(String id) throws IOException, InterruptedException {
@@ -139,4 +131,59 @@ public class LlapOutputFormatService {
LOG.info("Returning writer for: "+id);
return writer;
}
+
+ protected class LlapOutputFormatServiceHandler extends SimpleChannelInboundHandler<String> {
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx, String msg) {
+ String id = msg;
+ registerReader(ctx, id);
+ }
+
+ private void registerReader(ChannelHandlerContext ctx, String id) {
+ synchronized(service) {
+ LOG.debug("registering socket for: "+id);
+ int bufSize = 128 * 1024; // configable?
+ OutputStream stream = new ChannelOutputStream(ctx, id, bufSize);
+ LlapRecordWriter writer = new LlapRecordWriter(stream);
+ writers.put(id, writer);
+
+ // Add listener to handle any cleanup for when the connection is closed
+ ctx.channel().closeFuture().addListener(new LlapOutputFormatChannelCloseListener(id));
+
+ service.notifyAll();
+ }
+ }
+ }
+
+ protected class LlapOutputFormatChannelCloseListener implements ChannelFutureListener {
+ private String id;
+
+ LlapOutputFormatChannelCloseListener(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ RecordWriter writer = null;
+
+ synchronized (service) {
+ writer = writers.remove(id);
+ }
+
+ if (writer == null) {
+ LOG.warn("Did not find a writer for ID " + id);
+ }
+ }
+ }
+
+ protected class LlapOutputFormatServiceChannelHandler extends ChannelInitializer<SocketChannel> {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(
+ new DelimiterBasedFrameDecoder(MAX_QUERY_ID_LENGTH, Delimiters.nulDelimiter()),
+ new StringDecoder(),
+ new StringEncoder(),
+ new LlapOutputFormatServiceHandler());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/81b26df9/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 02439be..17f3895 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -203,17 +203,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
}
}
- try {
- if ("org.apache.hadoop.hive.llap.LlapStorageHandler".equals(getConf().getTableInfo().getProperties().
- get(hive_metastoreConstants.META_TABLE_STORAGE))) {
- (new LlapOutputFormat())
- .getRecordWriter(null,
- hconf instanceof JobConf ? (JobConf) hconf : new JobConf(hconf), null, null)
- .close(null);
- }
- } catch (IOException e) {
- // ignored
- }
+
try {
for (int i = 0; i < updaters.length; i++) {
if (updaters[i] != null) {