You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/06 17:25:26 UTC

[36/39] hive git commit: HIVE-13695: LlapOutputFormatService port should be able to be set via conf

HIVE-13695: LlapOutputFormatService port should be able to be set via conf


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/03ee0481
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/03ee0481
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/03ee0481

Branch: refs/heads/master
Commit: 03ee0481a518585a4a92875d88c560ff525d75d4
Parents: 2a03f1f
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu May 5 12:56:20 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu May 5 12:56:20 2016 -0700

----------------------------------------------------------------------
 .../hive/llap/daemon/impl/LlapDaemon.java       |  6 +++
 .../hive/llap/daemon/MiniLlapCluster.java       |  3 ++
 .../hive/llap/LlapOutputFormatService.java      | 44 +++++++++++++-------
 3 files changed, 38 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/03ee0481/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 223c390..b3c1abf 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -132,6 +132,10 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
         "Work dirs must be specified");
     Preconditions.checkArgument(shufflePort == 0 || (shufflePort > 1024 && shufflePort < 65536),
         "Shuffle Port must be betwee 1024 and 65535, or 0 for automatic selection");
+    int outputFormatServicePort = HiveConf.getIntVar(daemonConf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
+    Preconditions.checkArgument(outputFormatServicePort == 0
+        || (outputFormatServicePort > 1024 && outputFormatServicePort < 65536),
+        "OutputFormatService Port must be between 1024 and 65535, or 0 for automatic selection");
     String hosts = HiveConf.getTrimmedVar(daemonConf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
     if (hosts.startsWith("@")) {
       String zkHosts = HiveConf.getTrimmedVar(daemonConf, ConfVars.HIVE_ZOOKEEPER_QUORUM);
@@ -165,6 +169,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
         ", rpcListenerPort=" + srvPort +
         ", mngListenerPort=" + mngPort +
         ", webPort=" + webPort +
+        ", outputFormatSvcPort=" + outputFormatServicePort +
         ", workDirs=" + Arrays.toString(localDirs) +
         ", shufflePort=" + shufflePort +
         ", executorMemory=" + executorMemoryBytes +
@@ -335,6 +340,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     this.shufflePort.set(ShuffleHandler.get().getPort());
     getConfig()
         .setInt(ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT.varname, ShuffleHandler.get().getPort());
+    LlapOutputFormatService.initializeAndStart(getConfig());
     super.serviceStart();
 
     // Setup the actual ports in the configuration.

http://git-wip-us.apache.org/repos/asf/hive/blob/03ee0481/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index dde5be0..e394191 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -166,6 +166,7 @@ public class MiniLlapCluster extends AbstractService {
     int mngPort = 0;
     int shufflePort = 0;
     int webPort = 0;
+    int outputFormatServicePort = 0;
     boolean usePortsFromConf = conf.getBoolean("minillap.usePortsFromConf", false);
     LOG.info("MiniLlap configured to use ports from conf: {}", usePortsFromConf);
     if (usePortsFromConf) {
@@ -173,7 +174,9 @@ public class MiniLlapCluster extends AbstractService {
       mngPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT);
       shufflePort = conf.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
       webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT);
+      outputFormatServicePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
     }
+    HiveConf.setIntVar(conf, ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT, outputFormatServicePort);
 
     if (ownZkCluster) {
       miniZooKeeperCluster = new MiniZooKeeperCluster();

http://git-wip-us.apache.org/repos/asf/hive/blob/03ee0481/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index 6adbf7c..f852041 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.BytesWritable;
@@ -39,9 +40,13 @@ import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+
+import com.google.common.base.Preconditions;
+
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
@@ -67,9 +72,12 @@ public class LlapOutputFormatService {
 
   private static final Logger LOG = LoggerFactory.getLogger(LlapOutputFormat.class);
 
-  private static LlapOutputFormatService service;
+  private static final AtomicBoolean started = new AtomicBoolean(false);
+  private static final AtomicBoolean initing = new AtomicBoolean(false);
+  private static LlapOutputFormatService INSTANCE;
+
   private final Map<String, RecordWriter> writers;
-  private final HiveConf conf;
+  private final Configuration conf;
   private static final int WAIT_TIME = 5;
   private static final int MAX_QUERY_ID_LENGTH = 256;
 
@@ -78,23 +86,29 @@ public class LlapOutputFormatService {
   private ChannelFuture listeningChannelFuture;
   private int port;
 
-  private LlapOutputFormatService() throws IOException {
+  private LlapOutputFormatService(Configuration conf) throws IOException {
     writers = new HashMap<String, RecordWriter>();
-    conf = new HiveConf();
+    this.conf = conf;
   }
 
-  public static LlapOutputFormatService get() throws IOException {
-    if (service == null) {
-      service = new LlapOutputFormatService();
-      service.start();
+  public static void initializeAndStart(Configuration conf) throws Exception {
+    if (!initing.getAndSet(true)) {
+      INSTANCE = new LlapOutputFormatService(conf);
+      INSTANCE.start();
+      started.set(true);
     }
-    return service;
+  }
+
+  public static LlapOutputFormatService get() throws IOException {
+    Preconditions.checkState(started.get(),
+        "LlapOutputFormatService must be started before invoking get");
+    return INSTANCE;
   }
 
   public void start() throws IOException {
     LOG.info("Starting LlapOutputFormatService");
 
-    int portFromConf = conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
+    int portFromConf = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
     eventLoopGroup = new NioEventLoopGroup(1);
     serverBootstrap = new ServerBootstrap();
     serverBootstrap.group(eventLoopGroup);
@@ -125,10 +139,10 @@ public class LlapOutputFormatService {
 
   public <K,V> RecordWriter<K, V> getWriter(String id) throws IOException, InterruptedException {
     RecordWriter writer = null;
-    synchronized(service) {
+    synchronized(INSTANCE) {
       while ((writer = writers.get(id)) == null) {
         LOG.info("Waiting for writer for: "+id);
-        service.wait();
+        INSTANCE.wait();
       }
     }
     LOG.info("Returning writer for: "+id);
@@ -147,7 +161,7 @@ public class LlapOutputFormatService {
     }
 
     private void registerReader(ChannelHandlerContext ctx, String id) {
-      synchronized(service) {
+      synchronized(INSTANCE) {
         LOG.debug("registering socket for: "+id);
         int bufSize = 128 * 1024; // configable?
         OutputStream stream = new ChannelOutputStream(ctx, id, bufSize);
@@ -157,7 +171,7 @@ public class LlapOutputFormatService {
         // Add listener to handle any cleanup for when the connection is closed
         ctx.channel().closeFuture().addListener(new LlapOutputFormatChannelCloseListener(id));
 
-        service.notifyAll();
+        INSTANCE.notifyAll();
       }
     }
   }
@@ -173,7 +187,7 @@ public class LlapOutputFormatService {
     public void operationComplete(ChannelFuture future) throws Exception {
       RecordWriter writer = null;
 
-      synchronized (service) {
+      synchronized (INSTANCE) {
         writer = writers.remove(id);
       }