You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/06 17:24:59 UTC

[09/39] hive git commit: HIVE-13202: LLAP: Replace use of ServerSocket with netty in LlapOutputFormatService

HIVE-13202: LLAP: Replace use of ServerSocket with netty in LlapOutputFormatService


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/81b26df9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/81b26df9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/81b26df9

Branch: refs/heads/master
Commit: 81b26df9ed00e9db671c57aece8e51bf62365e34
Parents: d8a9531
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Mar 3 12:57:43 2016 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Mar 3 12:57:43 2016 -0800

----------------------------------------------------------------------
 .../apache/hive/jdbc/TestJdbcWithMiniLlap.java  |  74 +++++++--
 .../hadoop/hive/llap/ChannelOutputStream.java   | 141 +++++++++++++++++
 .../hive/llap/LlapOutputFormatService.java      | 155 ++++++++++++-------
 .../hadoop/hive/ql/exec/FileSinkOperator.java   |  12 +-
 4 files changed, 308 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/81b26df9/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
index 88e2e55..98daab4 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java
@@ -37,6 +37,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
@@ -86,6 +88,7 @@ public class TestJdbcWithMiniLlap {
   private static Path kvDataFilePath;
   private static final String tmpDir = System.getProperty("test.tmp.dir");
 
+  private static HiveConf conf = null;
   private Connection hs2Conn = null;
 
   @BeforeClass
@@ -98,7 +101,7 @@ public class TestJdbcWithMiniLlap {
       System.out.println("Setting hive-site: "+HiveConf.getHiveSiteLocation());
     }
 
-    HiveConf conf = new HiveConf();
+    conf = new HiveConf();
     conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
     // Necessary for GetSplits()/LlapInputFormat,
     // the config generated for the query fragment needs to include the MapWork
@@ -109,7 +112,7 @@ public class TestJdbcWithMiniLlap {
     conf.addResource(new URL("file://" + new File(confDir).toURI().getPath()
         + "/llap-daemon-site.xml"));
 
-    miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP, true);
+    miniHS2 = new MiniHS2(conf, MiniClusterType.LLAP);
 
     dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
     kvDataFilePath = new Path(dataFileDir, "kv1.txt");
@@ -160,21 +163,54 @@ public class TestJdbcWithMiniLlap {
     stmt.close();
   }
 
-  @Test
-  public void testLlapInputFormatEndToEnd() throws Exception {
-    createTestTable("testtab1");
+  private static boolean timedOut = false;
+
+  private static class TestTimerTask extends TimerTask {
+    private boolean timedOut = false;
+    private Thread threadToInterrupt;
+
+    public TestTimerTask(Thread threadToInterrupt) {
+      this.threadToInterrupt = threadToInterrupt;
+    }
+
+    @Override
+    public void run() {
+      System.out.println("Test timed out!");
+      timedOut = true;
+      threadToInterrupt.interrupt();
+    }
+
+    public boolean isTimedOut() {
+      return timedOut;
+    }
+
+    public void setTimedOut(boolean timedOut) {
+      this.timedOut = timedOut;
+    }
+
+  }
+
+  private int getLlapIFRowCount(String query, int numSplits) throws Exception {
+    // Add a timer task to stop this test if it has not finished in a reasonable amount of time.
+    Timer timer = new Timer();
+    long delay = 30000;
+    TestTimerTask timerTask = new TestTimerTask(Thread.currentThread());
+    timer.schedule(timerTask, delay);
+
+    // Setup LlapInputFormat
     String url = miniHS2.getJdbcURL();
     String user = System.getProperty("user.name");
     String pwd = user;
-    String query = "select * from testtab1 where under_col = 0";
 
     LlapInputFormat inputFormat = new LlapInputFormat(url, user, pwd, query);
-    JobConf job = new JobConf();
-    int numSplits = 1;
+
+    // Get splits
+    JobConf job = new JobConf(conf);
 
     InputSplit[] splits = inputFormat.getSplits(job, numSplits);
-    assert(splits.length > 0);
+    assertTrue(splits.length > 0);
 
+    // Fetch rows from splits
     boolean first = true;
     int rowCount = 0;
     for (InputSplit split : splits) {
@@ -198,6 +234,26 @@ public class TestJdbcWithMiniLlap {
         ++rowCount;
       }
     }
+
+    timer.cancel();
+    assertFalse("Test timed out", timerTask.isTimedOut());
+
+    return rowCount;
+  }
+
+  @Test
+  public void testLlapInputFormatEndToEnd() throws Exception {
+    createTestTable("testtab1");
+
+    int rowCount;
+
+    String query = "select * from testtab1 where under_col = 0";
+    rowCount = getLlapIFRowCount(query, 1);
     assertEquals(3, rowCount);
+
+    // Try empty rows query
+    query = "select * from testtab1 where true = false";
+    rowCount = getLlapIFRowCount(query, 1);
+    assertEquals(0, rowCount);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/81b26df9/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
new file mode 100644
index 0000000..e861791
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/llap/ChannelOutputStream.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OutputStream to write to the Netty Channel
+ */
+public class ChannelOutputStream extends OutputStream {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ChannelOutputStream.class);
+
+  private ChannelHandlerContext chc;
+  private int bufSize;
+  private String id;
+  private ByteBuf buf;
+  private byte[] singleByte = new byte[1];
+  private boolean closed = false;
+
+  private ChannelFutureListener listener = new ChannelFutureListener() {
+    @Override
+    public void operationComplete(ChannelFuture future) {
+      if (future.isCancelled()) {
+        LOG.error(id + " was cancelled");
+      } else if (!future.isSuccess()) {
+        LOG.error("Error on ID " + id, future.cause());
+      }
+    }
+  };
+
+  public ChannelOutputStream(ChannelHandlerContext chc, String id, int bufSize) {
+    this.chc = chc;
+    this.id = id;
+    this.bufSize = bufSize;
+    this.buf = chc.alloc().buffer(bufSize);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    singleByte[0] = (byte) b;
+    write(singleByte, 0, 1);
+  }
+
+  @Override
+  public void write(byte[] b) throws IOException {
+    write(b, 0, b.length);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    int currentOffset = off;
+    int bytesRemaining = len;
+
+    while (bytesRemaining + buf.readableBytes() > bufSize) {
+      int iterationLen = bufSize - buf.readableBytes();
+      writeInternal(b, currentOffset, iterationLen);
+      currentOffset += iterationLen;
+      bytesRemaining -= iterationLen;
+    }
+
+    if (bytesRemaining > 0) {
+      writeInternal(b, currentOffset, bytesRemaining);
+    }
+  }
+
+  @Override
+  public void flush() throws IOException {
+    if (buf.isReadable()) {
+      writeToChannel();
+    }
+    chc.flush();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed) {
+      throw new IOException("Already closed: " + id);
+    }
+
+    try {
+      flush();
+    } catch (IOException err) {
+      LOG.error("Error flushing stream before close", err);
+    }
+
+    try {
+      chc.close().addListener(listener).sync();
+    } catch (InterruptedException err) {
+      throw new IOException(err);
+    } finally {
+      buf.release();
+      buf = null;
+      chc = null;
+      closed = true;
+    }
+  }
+
+  private void writeToChannel() throws IOException {
+    if (closed) {
+      throw new IOException("Already closed: " + id);
+    }
+
+    chc.write(buf.copy()).addListener(listener);
+    buf.clear();
+  }
+
+  private void writeInternal(byte[] b, int off, int len) throws IOException {
+    if (closed) {
+      throw new IOException("Already closed: " + id);
+    }
+
+    buf.writeBytes(b, off, len);
+    if (buf.readableBytes() >= bufSize) {
+      writeToChannel();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/81b26df9/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index a197d7b..b39f085 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -19,9 +19,6 @@ package org.apache.hadoop.hive.llap;
 import java.util.Map;
 import java.util.HashMap;
 import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.io.InputStream;
 import java.io.OutputStream;
 
 import org.slf4j.Logger;
@@ -45,8 +42,22 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.handler.codec.Delimiters;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+import io.netty.util.concurrent.Future;
+
 
 /**
  *
@@ -57,18 +68,17 @@ public class LlapOutputFormatService {
 
   private static LlapOutputFormatService service;
   private final Map<String, RecordWriter> writers;
-  private final ServerSocket socket;
   private final HiveConf conf;
-  private final ExecutorService executor;
   private static final int WAIT_TIME = 5;
+  private static final int MAX_QUERY_ID_LENGTH = 256;
+
+  private EventLoopGroup eventLoopGroup;
+  private ServerBootstrap serverBootstrap;
+  private ChannelFuture listeningChannelFuture;
 
   private LlapOutputFormatService() throws IOException {
     writers = new HashMap<String, RecordWriter>();
     conf = new HiveConf();
-    executor = Executors.newSingleThreadExecutor(
-      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("LLAP output %d").build());
-    socket = new ServerSocket(
-      conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT));
   }
 
   public static LlapOutputFormatService get() throws IOException {
@@ -80,52 +90,34 @@ public class LlapOutputFormatService {
   }
 
   public void start() throws IOException {
-    executor.submit(new Runnable() {
-        byte[] buffer = new byte[4096];
-        @Override
-        public void run() {
-          while (true) {
-            Socket s = null;
-            try {
-              s = socket.accept();
-              String id = readId(s);
-              LOG.debug("Received: "+id);
-              registerReader(s, id);
-            } catch (IOException io) {
-              if (s != null) {
-                try{
-                  s.close();
-                } catch (IOException io2) {
-                  // ignore
-                }
-              }
-            }
-          }
-        }
-
-        private String readId(Socket s) throws IOException {
-          InputStream in = s.getInputStream();
-          int idx = 0;
-          while((buffer[idx++] = (byte)in.read()) != '\0') {}
-          return new String(buffer,0,idx-1);
-        }
-
-        private void registerReader(Socket s, String id) throws IOException {
-          synchronized(service) {
-            LOG.debug("registering socket for: "+id);
-            LlapRecordWriter writer = new LlapRecordWriter(s.getOutputStream());
-            writers.put(id, writer);
-            service.notifyAll();
-          }
-        }
-      }
-      );
+    LOG.info("Starting LlapOutputFormatService");
+
+    int port = conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
+    eventLoopGroup = new NioEventLoopGroup(1);
+    serverBootstrap = new ServerBootstrap();
+    serverBootstrap.group(eventLoopGroup);
+    serverBootstrap.channel(NioServerSocketChannel.class);
+    serverBootstrap.childHandler(new LlapOutputFormatServiceChannelHandler());
+    try {
+      LOG.info("LlapOutputFormatService: Binding to port " + port);
+      listeningChannelFuture = serverBootstrap.bind(port).sync();
+    } catch (InterruptedException err) {
+      throw new IOException("LlapOutputFormatService: Error binding to port " + port, err);
+    }
   }
 
   public void stop() throws IOException, InterruptedException {
-    executor.shutdown();
-    executor.awaitTermination(WAIT_TIME, TimeUnit.SECONDS);
-    socket.close();
+    LOG.info("Stopping LlapOutputFormatService");
+
+    if (listeningChannelFuture != null) {
+      listeningChannelFuture.channel().close().sync();
+      listeningChannelFuture = null;
+    } else {
+      LOG.warn("LlapOutputFormatService does not appear to have a listening port to close.");
+    }
+
+    Future terminationFuture = eventLoopGroup.shutdownGracefully(1, WAIT_TIME, TimeUnit.SECONDS);
+    terminationFuture.sync();
   }
 
   public <K,V> RecordWriter<K, V> getWriter(String id) throws IOException, InterruptedException {
@@ -139,4 +131,59 @@ public class LlapOutputFormatService {
     LOG.info("Returning writer for: "+id);
     return writer;
   }
+
+  protected class LlapOutputFormatServiceHandler extends SimpleChannelInboundHandler<String> {
+    @Override
+    public void channelRead0(ChannelHandlerContext ctx, String msg) {
+      String id = msg;
+      registerReader(ctx, id);
+    }
+
+    private void registerReader(ChannelHandlerContext ctx, String id) {
+      synchronized(service) {
+        LOG.debug("registering socket for: "+id);
+        int bufSize = 128 * 1024; // configable?
+        OutputStream stream = new ChannelOutputStream(ctx, id, bufSize);
+        LlapRecordWriter writer = new LlapRecordWriter(stream);
+        writers.put(id, writer);
+
+        // Add listener to handle any cleanup for when the connection is closed
+        ctx.channel().closeFuture().addListener(new LlapOutputFormatChannelCloseListener(id));
+
+        service.notifyAll();
+      }
+    }
+  }
+
+  protected class LlapOutputFormatChannelCloseListener implements ChannelFutureListener {
+    private String id;
+
+    LlapOutputFormatChannelCloseListener(String id) {
+      this.id = id;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      RecordWriter writer = null;
+
+      synchronized (service) {
+        writer = writers.remove(id);
+      }
+
+      if (writer == null) {
+        LOG.warn("Did not find a writer for ID " + id);
+      }
+    }
+  }
+
+  protected class LlapOutputFormatServiceChannelHandler extends ChannelInitializer<SocketChannel> {
+    @Override
+    public void initChannel(SocketChannel ch) throws Exception {
+        ch.pipeline().addLast(
+            new DelimiterBasedFrameDecoder(MAX_QUERY_ID_LENGTH, Delimiters.nulDelimiter()),
+            new StringDecoder(),
+            new StringEncoder(),
+            new LlapOutputFormatServiceHandler());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/81b26df9/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 02439be..17f3895 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -203,17 +203,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           }
         }
       }
-      try {
-        if ("org.apache.hadoop.hive.llap.LlapStorageHandler".equals(getConf().getTableInfo().getProperties().
-            get(hive_metastoreConstants.META_TABLE_STORAGE))) {
-          (new LlapOutputFormat())
-              .getRecordWriter(null,
-                  hconf instanceof JobConf ? (JobConf) hconf : new JobConf(hconf), null, null)
-              .close(null);
-        }
-      } catch (IOException e) {
-        // ignored
-      }
+
       try {
         for (int i = 0; i < updaters.length; i++) {
           if (updaters[i] != null) {