You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2016/05/06 20:43:07 UTC
[36/50] [abbrv] hive git commit: HIVE-13695: LlapOutputFormatService
port should be able to be set via conf
HIVE-13695: LlapOutputFormatService port should be able to be set via conf
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/03ee0481
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/03ee0481
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/03ee0481
Branch: refs/heads/java8
Commit: 03ee0481a518585a4a92875d88c560ff525d75d4
Parents: 2a03f1f
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu May 5 12:56:20 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu May 5 12:56:20 2016 -0700
----------------------------------------------------------------------
.../hive/llap/daemon/impl/LlapDaemon.java | 6 +++
.../hive/llap/daemon/MiniLlapCluster.java | 3 ++
.../hive/llap/LlapOutputFormatService.java | 44 +++++++++++++-------
3 files changed, 38 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/03ee0481/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 223c390..b3c1abf 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -132,6 +132,10 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
"Work dirs must be specified");
Preconditions.checkArgument(shufflePort == 0 || (shufflePort > 1024 && shufflePort < 65536),
"Shuffle Port must be betwee 1024 and 65535, or 0 for automatic selection");
+ int outputFormatServicePort = HiveConf.getIntVar(daemonConf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
+ Preconditions.checkArgument(outputFormatServicePort == 0
+ || (outputFormatServicePort > 1024 && outputFormatServicePort < 65536),
+ "OutputFormatService Port must be between 1024 and 65535, or 0 for automatic selection");
String hosts = HiveConf.getTrimmedVar(daemonConf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
if (hosts.startsWith("@")) {
String zkHosts = HiveConf.getTrimmedVar(daemonConf, ConfVars.HIVE_ZOOKEEPER_QUORUM);
@@ -165,6 +169,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
", rpcListenerPort=" + srvPort +
", mngListenerPort=" + mngPort +
", webPort=" + webPort +
+ ", outputFormatSvcPort=" + outputFormatServicePort +
", workDirs=" + Arrays.toString(localDirs) +
", shufflePort=" + shufflePort +
", executorMemory=" + executorMemoryBytes +
@@ -335,6 +340,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
this.shufflePort.set(ShuffleHandler.get().getPort());
getConfig()
.setInt(ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT.varname, ShuffleHandler.get().getPort());
+ LlapOutputFormatService.initializeAndStart(getConfig());
super.serviceStart();
// Setup the actual ports in the configuration.
http://git-wip-us.apache.org/repos/asf/hive/blob/03ee0481/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
----------------------------------------------------------------------
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
index dde5be0..e394191 100644
--- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java
@@ -166,6 +166,7 @@ public class MiniLlapCluster extends AbstractService {
int mngPort = 0;
int shufflePort = 0;
int webPort = 0;
+ int outputFormatServicePort = 0;
boolean usePortsFromConf = conf.getBoolean("minillap.usePortsFromConf", false);
LOG.info("MiniLlap configured to use ports from conf: {}", usePortsFromConf);
if (usePortsFromConf) {
@@ -173,7 +174,9 @@ public class MiniLlapCluster extends AbstractService {
mngPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT);
shufflePort = conf.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT);
+ outputFormatServicePort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
}
+ HiveConf.setIntVar(conf, ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT, outputFormatServicePort);
if (ownZkCluster) {
miniZooKeeperCluster = new MiniZooKeeperCluster();
http://git-wip-us.apache.org/repos/asf/hive/blob/03ee0481/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index 6adbf7c..f852041 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.BytesWritable;
@@ -39,9 +40,13 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+
+import com.google.common.base.Preconditions;
+
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
@@ -67,9 +72,12 @@ public class LlapOutputFormatService {
private static final Logger LOG = LoggerFactory.getLogger(LlapOutputFormat.class);
- private static LlapOutputFormatService service;
+ private static final AtomicBoolean started = new AtomicBoolean(false);
+ private static final AtomicBoolean initing = new AtomicBoolean(false);
+ private static LlapOutputFormatService INSTANCE;
+
private final Map<String, RecordWriter> writers;
- private final HiveConf conf;
+ private final Configuration conf;
private static final int WAIT_TIME = 5;
private static final int MAX_QUERY_ID_LENGTH = 256;
@@ -78,23 +86,29 @@ public class LlapOutputFormatService {
private ChannelFuture listeningChannelFuture;
private int port;
- private LlapOutputFormatService() throws IOException {
+ private LlapOutputFormatService(Configuration conf) throws IOException {
writers = new HashMap<String, RecordWriter>();
- conf = new HiveConf();
+ this.conf = conf;
}
- public static LlapOutputFormatService get() throws IOException {
- if (service == null) {
- service = new LlapOutputFormatService();
- service.start();
+ public static void initializeAndStart(Configuration conf) throws Exception {
+ if (!initing.getAndSet(true)) {
+ INSTANCE = new LlapOutputFormatService(conf);
+ INSTANCE.start();
+ started.set(true);
}
- return service;
+ }
+
+ public static LlapOutputFormatService get() throws IOException {
+ Preconditions.checkState(started.get(),
+ "LlapOutputFormatService must be started before invoking get");
+ return INSTANCE;
}
public void start() throws IOException {
LOG.info("Starting LlapOutputFormatService");
- int portFromConf = conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
+ int portFromConf = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
eventLoopGroup = new NioEventLoopGroup(1);
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventLoopGroup);
@@ -125,10 +139,10 @@ public class LlapOutputFormatService {
public <K,V> RecordWriter<K, V> getWriter(String id) throws IOException, InterruptedException {
RecordWriter writer = null;
- synchronized(service) {
+ synchronized(INSTANCE) {
while ((writer = writers.get(id)) == null) {
LOG.info("Waiting for writer for: "+id);
- service.wait();
+ INSTANCE.wait();
}
}
LOG.info("Returning writer for: "+id);
@@ -147,7 +161,7 @@ public class LlapOutputFormatService {
}
private void registerReader(ChannelHandlerContext ctx, String id) {
- synchronized(service) {
+ synchronized(INSTANCE) {
LOG.debug("registering socket for: "+id);
int bufSize = 128 * 1024; // configable?
OutputStream stream = new ChannelOutputStream(ctx, id, bufSize);
@@ -157,7 +171,7 @@ public class LlapOutputFormatService {
// Add listener to handle any cleanup for when the connection is closed
ctx.channel().closeFuture().addListener(new LlapOutputFormatChannelCloseListener(id));
- service.notifyAll();
+ INSTANCE.notifyAll();
}
}
}
@@ -173,7 +187,7 @@ public class LlapOutputFormatService {
public void operationComplete(ChannelFuture future) throws Exception {
RecordWriter writer = null;
- synchronized (service) {
+ synchronized (INSTANCE) {
writer = writers.remove(id);
}