You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org> on 2016/02/23 16:14:15 UTC
Change in asterixdb[master]: Introduce NC to NC Messaging
abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/657
Change subject: Introduce NC to NC Messaging
......................................................................
Introduce NC to NC Messaging
This change introduces NC to NC messaging. NC message
broker takes care of listening to NC message port and
sending messages to other NCs.
Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
---
M asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
M asterix-app/src/main/resources/asterix-build-configuration.xml
A asterix-app/src/main/resources/cluster.xml
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M asterix-common/src/main/resources/schema/asterix-conf.xsd
M asterix-common/src/main/resources/schema/cluster.xsd
M asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
M asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
M asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
M asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
M asterix-external-data/pom.xml
A asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
M asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
A asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
A asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
M asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
M asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
M asterix-installer/src/main/resources/clusters/demo/demo.xml
M asterix-installer/src/main/resources/clusters/local/local.xml
M asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
M asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
M asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
33 files changed, 617 insertions(+), 59 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/57/657/1
diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index e33aed2..e3493c6 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -87,7 +87,7 @@
if (tempPath.endsWith(File.separator)) {
tempPath = tempPath.substring(0, tempPath.length() - 1);
}
- //get initial partitions from properties
+ // get initial partitions from properties
String[] nodeStores = propertiesAccessor.getStores().get(ncName);
if (nodeStores == null) {
throw new Exception("Coudn't find stores for NC: " + ncName);
@@ -97,7 +97,7 @@
tempDirPath += File.separator;
}
for (int p = 0; p < nodeStores.length; p++) {
- //create IO devices based on stores
+ // create IO devices based on stores
String iodevicePath = tempDirPath + ncConfig1.nodeId + File.separator + nodeStores[p];
File ioDeviceDir = new File(iodevicePath);
ioDeviceDir.mkdirs();
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 4922ae6..58523f4 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -122,7 +122,7 @@
if (replicationEnabled) {
if (systemState == SystemState.NEW_UNIVERSE || systemState == SystemState.CORRUPTED) {
- //Try to perform remote recovery
+ // Try to perform remote recovery
IRemoteRecoveryManager remoteRecoveryMgr = runtimeContext.getRemoteRecoveryManager();
if (autoFailover) {
remoteRecoveryMgr.startFailbackProcess();
@@ -153,13 +153,13 @@
}
private void startReplicationService() {
- //Open replication channel
+ // Open replication channel
runtimeContext.getReplicationChannel().start();
- //Check the state of remote replicas
+ // Check the state of remote replicas
runtimeContext.getReplicationManager().initializeReplicasState();
- //Start replication after the state of remote replicas has been initialized.
+ // Start replication after the state of remote replicas has been initialized.
runtimeContext.getReplicationManager().startReplicationThreads();
}
@@ -176,10 +176,10 @@
MetadataBootstrap.stopUniverse();
}
- //Clean any temporary files
+ // Clean any temporary files
performLocalCleanUp();
- //Note: stopping recovery manager will make a sharp checkpoint
+ // Note: stopping recovery manager will make a sharp checkpoint
ncApplicationContext.getLifeCycleComponentManager().stopAll(false);
runtimeContext.deinitialize();
} else {
@@ -191,7 +191,7 @@
@Override
public void notifyStartupComplete() throws Exception {
- //Send max resource id on this NC to the CC
+ // Send max resource id on this NC to the CC
((INCMessageBroker) ncApplicationContext.getMessageBroker()).reportMaxResourceId();
AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
@@ -245,15 +245,15 @@
}
}
- //Clean any temporary files
+ // Clean any temporary files
performLocalCleanUp();
}
private void performLocalCleanUp() {
- //Delete working area files from failed jobs
+ // Delete working area files from failed jobs
runtimeContext.getIOManager().deleteWorkspaceFiles();
- //Reclaim storage for temporary datasets.
+ // Reclaim storage for temporary datasets.
String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
String[] ioDevices = ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
.getStorageMountingPoints();
@@ -263,9 +263,9 @@
FileUtils.deleteQuietly(new File(tempDatasetsDir));
}
- //TODO
- //Reclaim storage for orphaned index artifacts in NCs.
- //Note: currently LSM indexes invalid components are deleted when an index is activated.
+ // TODO
+ // Reclaim storage for orphaned index artifacts in NCs.
+ // Note: currently LSM indexes invalid components are deleted when an index is activated.
}
private void updateOnNodeJoin() {
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 0a0a917..de3848e 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -24,7 +24,7 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.api.common.AsterixAppRuntimeContext;
import org.apache.asterix.common.messaging.AbstractApplicationMessage;
import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
@@ -43,6 +43,7 @@
import org.apache.asterix.common.replication.ReplicaEvent;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
+import org.apache.asterix.om.util.AsterixClusterProperties;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -54,16 +55,21 @@
private final NodeControllerService ncs;
private final AtomicLong messageId = new AtomicLong(0);
private final Map<Long, IApplicationMessageCallback> callbacks;
- private final IAsterixAppRuntimeContext appContext;
+ private final NCMessagingServer messageServer;
+
+ private final AsterixAppRuntimeContext appContext;
public NCMessageBroker(NodeControllerService ncs) {
this.ncs = ncs;
- appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
callbacks = new ConcurrentHashMap<Long, IApplicationMessageCallback>();
+ // Start listening to NC messages
+ appContext = (AsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+ messageServer = new NCMessagingServer(appContext.getMetadataProperties().getNodeMessagingPort(ncs.getId()));
+ messageServer.start();
}
@Override
- public void sendMessage(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception {
+ public void sendMessageToCC(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception {
if (callback != null) {
long uniqueMessageId = messageId.incrementAndGet();
message.setId(uniqueMessageId);
@@ -73,7 +79,7 @@
ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
} catch (Exception e) {
if (callback != null) {
- //remove the callback in case of failure
+ // remove the callback in case of failure
callbacks.remove(message.getId());
}
throw e;
@@ -131,7 +137,7 @@
//send response after takeover is completed
TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
appContext.getTransactionSubsystem().getId(), msg.getPartitions());
- sendMessage(reponse, null);
+ sendMessageToCC(reponse, null);
}
}
@@ -142,18 +148,18 @@
} finally {
TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
appContext.getTransactionSubsystem().getId());
- sendMessage(reponse, null);
+ sendMessageToCC(reponse, null);
}
}
@Override
public void reportMaxResourceId() throws Exception {
ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage();
- //resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for metadata indexes.
+ // resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for metadata indexes.
long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
maxResourceIdMsg.setMaxResourceId(maxResourceId);
- sendMessage(maxResourceIdMsg, null);
+ sendMessageToCC(maxResourceIdMsg, null);
}
private void handleReplicaEvent(IMessage message) {
@@ -195,7 +201,7 @@
//send response after partitions prepared for failback
PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(msg.getPlanId(),
msg.getRequestId(), msg.getPartitions());
- sendMessage(reponse, null);
+ sendMessageToCC(reponse, null);
}
private void handleCompleteFailbackRequest(IMessage message) throws Exception {
@@ -206,7 +212,15 @@
} finally {
CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(msg.getPlanId(),
msg.getRequestId(), msg.getPartitions());
- sendMessage(reponse, null);
+ sendMessageToCC(reponse, null);
}
}
+
+ @Override
+ public void sendMessageToNC(String to, IApplicationMessage message, IApplicationMessageCallback callback)
+ throws Exception {
+ NCMessagingClient client = new NCMessagingClient(AsterixClusterProperties.INSTANCE.getNodeIpAddress(to),
+ appContext.getMetadataProperties().getNodeMessagingPort(ncs.getId()), 4);
+ client.start();
+ }
}
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
new file mode 100644
index 0000000..e34a0f3
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+
+public class NCMessageDecoder extends ReplayingDecoder<Void> {
+
+ @Override
+ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+ System.err.println("NCMessageDecoder: decode called");
+ out.add(in.readBytes(4));
+ }
+}
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
new file mode 100644
index 0000000..63eb888
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import javax.net.ssl.SSLException;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+public class NCMessagingClient {
+ private final String host;
+ private final int port;
+ private final int size;
+
+ public NCMessagingClient(String host, int port, int size) {
+ this.host = host;
+ this.port = port;
+ this.size = size;
+ }
+
+ public void start() throws SSLException, InterruptedException {
+ EventLoopGroup group = new NioEventLoopGroup();
+ try {
+ Bootstrap b = new Bootstrap();
+ b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline p = ch.pipeline();
+ p.addLast(new NCMessagingClientHandler(size));
+ }
+ });
+
+ // Make the connection attempt.
+ ChannelFuture f = b.connect(host, port).sync();
+
+ // Wait until the connection is closed.
+ f.channel().closeFuture().sync();
+ } finally {
+ group.shutdownGracefully();
+ }
+ }
+}
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
new file mode 100644
index 0000000..40dd153
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+public class NCMessagingClientHandler extends SimpleChannelInboundHandler<Object> {
+
+ private ByteBuf content;
+ private ChannelHandlerContext ctx;
+ private int size;
+
+ public NCMessagingClientHandler(int size) {
+ this.size = size;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ this.ctx = ctx;
+
+ // Initialize the message.
+ content = ctx.alloc().directBuffer(size);
+ content.writeInt(10);
+
+ // Send the initial messages.
+ generateTraffic();
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) {
+ content.release();
+ }
+
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
+ // Server is supposed to send nothing, but if it sends something, discard it.
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ // Close the connection when an exception is raised.
+ cause.printStackTrace();
+ ctx.close();
+ }
+
+ long counter;
+
+ private void generateTraffic() {
+ // Flush the outbound buffer to the socket.
+ // Once flushed, generate the same amount of traffic again.
+ ctx.writeAndFlush(content.duplicate().retain()).addListener(trafficGenerator);
+ }
+
+ private final ChannelFutureListener trafficGenerator = new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) {
+ if (future.isSuccess()) {
+ future.channel().close();
+ } else {
+ future.cause().printStackTrace();
+ future.channel().close();
+ }
+ }
+ };
+
+}
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
new file mode 100644
index 0000000..3cb6aaa
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+public class NCMessagingHandler extends ChannelInboundHandlerAdapter {
+ private ByteBuf buf;
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) {
+ buf = ctx.alloc().buffer(4);
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) {
+ buf.release();
+ buf = null;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ System.err.println("channelRead Called");
+ ByteBuf m = (ByteBuf) msg;
+ buf.writeBytes(m);
+ m.release();
+
+ if (buf.readableBytes() >= 4) {
+ System.out.println(buf.readInt());
+ ctx.close();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ cause.printStackTrace();
+ ctx.close();
+ }
+}
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
new file mode 100644
index 0000000..dbf4c0e
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+
+public class NCMessagingServer {
+ private int port;
+ private ChannelInboundHandlerAdapter handler = new NCMessagingHandler();
+ private ChannelFuture f;
+
+ public NCMessagingServer(int port) {
+ this.port = port;
+ }
+
+ public void stop() {
+ f.channel().close();
+ }
+
+ public void start() {
+ Thread thread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+ try {
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(new NCMessageDecoder(), new NCMessagingHandler());
+ }
+ }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
+
+ // Bind and start to accept incoming connections.
+ ChannelFuture f = b.bind(port).sync();
+
+ // Wait until the server socket is closed.
+ // In this example, this does not happen, but you can do that to gracefully
+ // shut down your server.
+ f.channel().closeFuture().sync();
+ } catch (Throwable th) {
+ // Do something
+ th.printStackTrace();
+ } finally {
+ workerGroup.shutdownGracefully();
+ bossGroup.shutdownGracefully();
+ }
+ }
+ });
+ thread.setName("NCMessagingService:" + port);
+ thread.start();
+ }
+}
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml
index 731113b..2dd5cfc 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -20,10 +20,12 @@
<metadataNode>asterix_nc1</metadataNode>
<store>
<ncId>asterix_nc1</ncId>
+ <nc_messaging_port>4501</nc_messaging_port>
<storeDirs>iodevice0,iodevice1</storeDirs>
</store>
<store>
<ncId>asterix_nc2</ncId>
+ <nc_messaging_port>4502</nc_messaging_port>
<storeDirs>iodevice0,iodevice1</storeDirs>
</store>
<transactionLogDir>
diff --git a/asterix-app/src/main/resources/cluster.xml b/asterix-app/src/main/resources/cluster.xml
new file mode 100644
index 0000000..7c7e321
--- /dev/null
+++ b/asterix-app/src/main/resources/cluster.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<cluster xmlns="cluster">
+ <instance_name>asterix</instance_name>
+ <store>storage</store>
+ <master_node>
+ <id>master</id>
+ <client_ip>127.0.0.1</client_ip>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ <client_port>1098</client_port>
+ <cluster_port>1099</cluster_port>
+ <http_port>8888</http_port>
+ </master_node>
+ <nc_messaging_port>4503</nc_messaging_port>
+ <node>
+ <id>nc1</id>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ <nc_messaging_port>4501</nc_messaging_port>
+ </node>
+ <node>
+ <id>nc2</id>
+ <cluster_ip>127.0.0.1</cluster_ip>
+ <nc_messaging_port>4502</nc_messaging_port>
+ </node>
+</cluster>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index 473a163..289a8ed 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -69,4 +69,8 @@
public Map<String, String> getTransactionLogDirs() {
return accessor.getTransactionLogDirs();
}
+
+ public int getNodeMessagingPort(String nodeId) {
+ return accessor.getNodeMessagingPort(nodeId);
+ }
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index 13ce403..3c31936 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -57,7 +57,8 @@
private final Map<String, String> transactionLogDirs;
private final Map<String, String> asterixBuildProperties;
private final Map<String, ClusterPartition[]> nodePartitionsMap;
- private SortedMap<Integer, ClusterPartition> clusterPartitions;
+ private final SortedMap<Integer, ClusterPartition> clusterPartitions;
+ private final Map<String, Integer> node2NCMessagingPort;
public AsterixPropertiesAccessor() throws AsterixException {
String fileName = System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY);
@@ -86,7 +87,9 @@
instanceName = asterixConfiguration.getInstanceName();
metadataNodeName = asterixConfiguration.getMetadataNode();
stores = new HashMap<String, String[]>();
+ node2NCMessagingPort = new HashMap<>();
List<Store> configuredStores = asterixConfiguration.getStore();
+
nodeNames = new HashSet<String>();
nodePartitionsMap = new HashMap<>();
clusterPartitions = new TreeMap<>();
@@ -103,6 +106,7 @@
stores.put(store.getNcId(), nodeStores);
nodePartitionsMap.put(store.getNcId(), nodePartitions);
nodeNames.add(store.getNcId());
+ node2NCMessagingPort.put(store.getNcId(), Integer.parseInt(store.getNcMessagingPort()));
}
asterixConfigurationParams = new HashMap<String, Property>();
for (Property p : asterixConfiguration.getProperty()) {
@@ -208,4 +212,8 @@
public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
return clusterPartitions;
}
+
+ public int getNodeMessagingPort(String nodeId) {
+ return node2NCMessagingPort.get(nodeId);
+ }
}
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 5d2e263..95fe37b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -42,7 +42,7 @@
/**
* Sets a unique message id that identifies this message within an NC.
- * This id is set by {@link INCMessageBroker#sendMessage(IApplicationMessage, IApplicationMessageCallback)}
+ * This id is set by {@link INCMessageBroker#sendMessageToCC(IApplicationMessage, IApplicationMessageCallback)}
* when the callback is not null to notify the sender when the response to that message is received.
*
* @param messageId
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index 41f8a0c..724ed04 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -29,7 +29,17 @@
* @param callback
* @throws Exception
*/
- public void sendMessage(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception;
+ public void sendMessageToCC(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception;
+
+ /**
+ * Sends application message from this NC to another NC.
+ *
+ * @param message
+ * @param callback
+ * @throws Exception
+ */
+ public void sendMessageToNC(String to, IApplicationMessage message, IApplicationMessageCallback callback)
+ throws Exception;
/**
* Sends the maximum resource id on this NC to the CC.
diff --git a/asterix-common/src/main/resources/schema/asterix-conf.xsd b/asterix-common/src/main/resources/schema/asterix-conf.xsd
index bb99319..3a66421 100644
--- a/asterix-common/src/main/resources/schema/asterix-conf.xsd
+++ b/asterix-common/src/main/resources/schema/asterix-conf.xsd
@@ -56,12 +56,16 @@
<xs:element
name="txnLogDirPath"
type="xs:string" />
-
+ <xs:element
+ name="nc_messaging_port"
+ type="xs:string" />
+
<!-- definition of complex elements -->
<xs:element name="store">
<xs:complexType>
<xs:sequence>
<xs:element ref="mg:ncId" />
+ <xs:element ref="mg:nc_messaging_port" />
<xs:element ref="mg:storeDirs" />
</xs:sequence>
</xs:complexType>
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 935d33f..1ec9a64 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -57,6 +57,7 @@
<xs:element name="result_time_to_live" type="xs:long" />
<xs:element name="result_sweep_threshold" type="xs:long" />
<xs:element name="cc_root" type="xs:string" />
+ <xs:element name="nc_messaging_port" type="xs:integer" />
<!-- definition of complex elements -->
<xs:element name="working_dir">
@@ -123,6 +124,7 @@
<xs:element ref="cl:txn_log_dir" minOccurs="0" />
<xs:element ref="cl:iodevices" minOccurs="0" />
<xs:element ref="cl:debug_port" minOccurs="0" />
+ <xs:element ref="cl:nc_messaging_port" minOccurs="0" />
</xs:sequence>
</xs:complexType>
</xs:element>
@@ -151,6 +153,7 @@
<xs:element ref="cl:metadata_node" />
<xs:element ref="cl:data_replication" minOccurs="0" />
<xs:element ref="cl:master_node" />
+ <xs:element ref="cl:nc_messaging_port" minOccurs="0" />
<xs:element ref="cl:node" maxOccurs="unbounded" />
<xs:element ref="cl:substitute_nodes" />
<xs:element ref="cl:heartbeat_period" minOccurs="0" />
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
index 29765fd..c92262c 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
@@ -41,7 +41,7 @@
public class EventDriver {
public static final String CLIENT_NODE_ID = "client_node";
- public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null);
+ public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null, null);
private static String eventsDir;
private static Events events;
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java b/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
index e0e5bc4..b8c7f7c 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
@@ -25,13 +25,12 @@
import java.util.Timer;
import java.util.TimerTask;
-import org.apache.log4j.Logger;
-
import org.apache.asterix.event.driver.EventDriver;
import org.apache.asterix.event.schema.cluster.Node;
import org.apache.asterix.event.schema.event.Event;
import org.apache.asterix.event.schema.pattern.Pattern;
import org.apache.asterix.event.schema.pattern.Period;
+import org.apache.log4j.Logger;
public class EventTask extends TimerTask {
@@ -68,8 +67,8 @@
this.interval = EventUtil.parseTimeInterval(period.getAbsvalue(), period.getUnit());
}
if (pattern.getDelay() != null) {
- this.initialDelay = EventUtil.parseTimeInterval(new ValueType(pattern.getDelay().getValue()), pattern
- .getDelay().getUnit());
+ this.initialDelay = EventUtil.parseTimeInterval(new ValueType(pattern.getDelay().getValue()),
+ pattern.getDelay().getUnit());
}
if (pattern.getMaxOccurs() != null) {
this.maxOccurs = pattern.getMaxOccurs();
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
index b83faa2..c764cd7 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
@@ -186,12 +186,12 @@
}
if (nodeid.equals(cluster.getMasterNode().getId())) {
- String logDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir() : cluster.getMasterNode()
- .getLogDir();
- String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster
- .getMasterNode().getJavaHome();
+ String logDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir()
+ : cluster.getMasterNode().getLogDir();
+ String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome()
+ : cluster.getMasterNode().getJavaHome();
return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome, logDir,
- null, null, cluster.getMasterNode().getDebugPort());
+ null, null, cluster.getMasterNode().getDebugPort(), null);
}
List<Node> nodeList = cluster.getNode();
@@ -231,8 +231,8 @@
pb.start();
}
- public static void executeLocalScript(Node node, String script, List<String> args) throws IOException,
- InterruptedException {
+ public static void executeLocalScript(Node node, String script, List<String> args)
+ throws IOException, InterruptedException {
List<String> pargs = new ArrayList<String>();
pargs.add("/bin/bash");
pargs.add(script);
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
index ecbafa7..0f4c9f5 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
@@ -278,6 +278,7 @@
configuration.setMetadataNode(asterixInstanceName + "_" + metadataNodeId);
List<Store> stores = new ArrayList<Store>();
String storeDir = cluster.getStore().trim();
+ String nodeFeedPort;
for (Node node : cluster.getNode()) {
String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
String[] nodeIdDevice = iodevices.split(",");
@@ -287,7 +288,9 @@
}
//remove last comma
nodeStores.deleteCharAt(nodeStores.length() - 1);
- stores.add(new Store(asterixInstanceName + "_" + node.getId(), nodeStores.toString()));
+ nodeFeedPort = node.getNcMessagingPort() == null ? String.valueOf(cluster.getNcMessagingPort().intValue())
+ : String.valueOf(node.getNcMessagingPort().intValue());
+ stores.add(new Store(asterixInstanceName + "_" + node.getId(), nodeFeedPort, nodeStores.toString()));
}
configuration.setStore(stores);
List<Coredump> coredump = new ArrayList<Coredump>();
@@ -354,6 +357,7 @@
private static void zipDir(File sourceDir, final File destFile, ZipOutputStream zos) throws IOException {
File[] dirList = sourceDir.listFiles(new FileFilter() {
+ @Override
public boolean accept(File f) {
return !f.getName().endsWith(destFile.getName());
}
@@ -391,7 +395,7 @@
byte[] buffer = new byte[2048];
int read;
while (entries.hasMoreElements()) {
- JarEntry entry = (JarEntry) entries.nextElement();
+ JarEntry entry = entries.nextElement();
String name = entry.getName();
if (name.equals(origFile)) {
continue;
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 7801fd7..38b4824 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -287,5 +287,10 @@
<artifactId>rxjava</artifactId>
<version>1.0.15</version>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>4.0.33.Final</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
new file mode 100644
index 0000000..7796082
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import org.apache.asterix.common.config.AsterixFeedProperties;
+
+public class FeedChannelManager extends Thread {
+
+ public FeedChannelManager(AsterixFeedProperties feedProperties, String nodeId) {
+ }
+
+ @Override
+ public void run() {
+ }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
index 5095e7d..3965c6b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
@@ -45,6 +45,8 @@
private static final Logger LOGGER = Logger.getLogger(FeedManager.class.getName());
+ private FeedChannelManager feedChannelManager;
+
private final IFeedSubscriptionManager feedSubscriptionManager;
private final IFeedConnectionManager feedConnectionManager;
@@ -76,9 +78,11 @@
? AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp() : "localhost";
this.feedMessageService = new FeedMessageService(feedProperties, nodeId, ccClusterIp);
this.nodeLoadReportService = new NodeLoadReportService(nodeId, this);
+ this.feedChannelManager = new FeedChannelManager(feedProperties, nodeId);
try {
this.feedMessageService.start();
this.nodeLoadReportService.start();
+ this.feedChannelManager.start();
} catch (Exception e) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Unable to start feed services " + e.getMessage());
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
new file mode 100644
index 0000000..22c9a94
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+public class FeedMessageHandler extends ChannelInboundHandlerAdapter {
+
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
new file mode 100644
index 0000000..171b925
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+
+public class FeedMessageServer {
+ private final int port;
+ private final int numberOfChannels;
+
+ public FeedMessageServer(int port, int numOfClusterNodes) {
+ this.port = port;
+ this.numberOfChannels = numOfClusterNodes;
+ }
+
+ public void start() throws Exception {
+ EventLoopGroup bossGroup = new NioEventLoopGroup();
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+ try {
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline().addLast(new FeedMessageHandler());
+ }
+ }).option(ChannelOption.SO_BACKLOG, numberOfChannels).childOption(ChannelOption.SO_KEEPALIVE, true);
+ ChannelFuture f = b.bind(port).sync();
+ f.channel().closeFuture().sync();
+ } finally {
+ workerGroup.shutdownGracefully();
+ bossGroup.shutdownGracefully();
+ }
+ }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
index 870a6df..bb78233 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
@@ -38,10 +38,9 @@
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.hdfs.scheduler.Scheduler;
public class IndexingScheduler {
- private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName());
+ private static final Logger LOGGER = Logger.getLogger(IndexingScheduler.class.getName());
/** a list of NCs */
private String[] NCs;
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 9559394..4037eaf 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -105,7 +105,7 @@
MasterNode masterNode = cluster.getMasterNode();
Node master = new Node(masterNode.getId(), masterNode.getClusterIp(), masterNode.getJavaHome(),
- masterNode.getLogDir(), null, null, null);
+ masterNode.getLogDir(), null, null, null, null);
ipAddresses.add(masterNode.getClusterIp());
valid = valid & validateNodeConfiguration(master, cluster);
diff --git a/asterix-installer/src/main/resources/clusters/demo/demo.xml b/asterix-installer/src/main/resources/clusters/demo/demo.xml
index 1932fcc..8d5f13b 100644
--- a/asterix-installer/src/main/resources/clusters/demo/demo.xml
+++ b/asterix-installer/src/main/resources/clusters/demo/demo.xml
@@ -32,6 +32,7 @@
<log_dir>/tmp/asterix/logs</log_dir>
<store>storage</store>
<java_home></java_home>
+ <nc_messaging_port>5403</nc_messaging_port>
<master_node>
<id>master</id>
<client_ip>127.0.0.1</client_ip>
@@ -45,11 +46,13 @@
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/node1/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/node1/1,/tmp/asterix/node1/2</iodevices>
+ <nc_messaging_port>5401</nc_messaging_port>
</node>
<node>
<id>node2</id>
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/node2/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/node2/1,/tmp/asterix/node2/2</iodevices>
+ <nc_messaging_port>5402</nc_messaging_port>
</node>
</cluster>
diff --git a/asterix-installer/src/main/resources/clusters/local/local.xml b/asterix-installer/src/main/resources/clusters/local/local.xml
index 20f697f..7ed8b74 100644
--- a/asterix-installer/src/main/resources/clusters/local/local.xml
+++ b/asterix-installer/src/main/resources/clusters/local/local.xml
@@ -40,6 +40,7 @@
<result_time_to_live>86400000</result_time_to_live>
<!-- The duration within which an instance of the result cleanup should be invoked in milliseconds. (default: 1 minute) -->
<result_sweep_threshold>60000</result_sweep_threshold>
+ <nc_messaging_port>5403</nc_messaging_port>
<master_node>
<id>master</id>
<client_ip>127.0.0.1</client_ip>
@@ -53,12 +54,13 @@
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/nc1/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc1/p1,/tmp/asterix/nc1/p2</iodevices>
-
+ <nc_messaging_port>5401</nc_messaging_port>
</node>
<node>
<id>nc2</id>
<cluster_ip>127.0.0.1</cluster_ip>
<txn_log_dir>/tmp/asterix/nc2/txnLogs</txn_log_dir>
<iodevices>/tmp/asterix/nc2/p1,/tmp/asterix/nc2/p2</iodevices>
+ <nc_messaging_port>5402</nc_messaging_port>
</node>
</cluster>
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
index 3e37694..e7bf3bf 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
@@ -62,9 +62,10 @@
private ClusterManager() {
Cluster asterixCluster = AsterixClusterProperties.INSTANCE.getCluster();
- String eventHome = asterixCluster == null ? null : asterixCluster.getWorkingDir().getDir();
+ String eventHome = asterixCluster == null ? null
+ : asterixCluster.getWorkingDir() == null ? null : asterixCluster.getWorkingDir().getDir();
- if (asterixCluster != null) {
+ if (eventHome != null) {
String asterixDir = System.getProperty("user.dir") + File.separator + "asterix";
File configFile = new File(System.getProperty("user.dir") + File.separator + "configuration.xml");
Configuration configuration = null;
@@ -74,8 +75,8 @@
Unmarshaller unmarshaller = configCtx.createUnmarshaller();
configuration = (Configuration) unmarshaller.unmarshal(configFile);
AsterixEventService.initialize(configuration, asterixDir, eventHome);
- client = AsterixEventService.getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE
- .getCluster());
+ client = AsterixEventService
+ .getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE.getCluster());
lookupService = ServiceProvider.INSTANCE.getLookupService();
if (!lookupService.isRunning(configuration)) {
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index 3173525..faa0236 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -96,6 +96,7 @@
private Set<String> failedNodes = new HashSet<>();
private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
+ private HashMap<String, String> node2Ip = null;
private AsterixClusterProperties() {
InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
@@ -686,4 +687,20 @@
}
return stateDescription;
}
-}
+
+ public String getNodeIpAddress(String nodeId) {
+ if (node2Ip == null) {
+ synchronized (this) {
+ if (node2Ip == null) {
+ node2Ip = new HashMap<>();
+ String instanceName = cluster.getInstanceName();
+ List<Node> nodes = cluster.getNode();
+ for (Node node : nodes) {
+ node2Ip.put(instanceName + "_" + node.getId(), node.getClusterIp());
+ }
+ }
+ }
+ }
+ return node2Ip.get(nodeId);
+ }
+}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
index 5b29530..ab1ebe1 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
@@ -57,7 +57,7 @@
//if no response available or it has an exception, request a new one
if (reponse == null || reponse.getException() != null) {
ResourceIdRequestMessage msg = new ResourceIdRequestMessage();
- ((INCMessageBroker) appCtx.getMessageBroker()).sendMessage(msg, this);
+ ((INCMessageBroker) appCtx.getMessageBroker()).sendMessageToCC(msg, this);
reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
if (reponse.getException() != null) {
throw new HyracksDataException(reponse.getException().getMessage());
diff --git a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
index f9d10af..c834128 100644
--- a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
+++ b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
@@ -1358,7 +1358,7 @@
}
//remove last comma
nodeStores.deleteCharAt(nodeStores.length() - 1);
- stores.add(new Store(node.getId(), nodeStores.toString()));
+ stores.add(new Store(node.getId(), "4501", nodeStores.toString()));
}
configuration.setStore(stores);
List<Coredump> coredump = new ArrayList<Coredump>();
@@ -1369,10 +1369,13 @@
coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
coredump.add(new Coredump(node.getId(), coredumpDir + "coredump" + File.separator));
txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir(); //node or cluster-wide
- txnLogDirs.add(new TransactionLogDir(node.getId(), txnLogDir
- + (txnLogDir.charAt(txnLogDir.length() - 1) == File.separatorChar ? File.separator : "")
- + "txnLogs" //if the string doesn't have a trailing / add one
- + File.separator));
+ txnLogDirs
+ .add(new TransactionLogDir(node.getId(),
+ txnLogDir
+ + (txnLogDir.charAt(txnLogDir.length() - 1) == File.separatorChar ? File.separator
+ : "")
+ + "txnLogs" //if the string doesn't have a trailing / add one
+ + File.separator));
}
configuration.setMetadataNode(metadataNodeId);
configuration.setCoredump(coredump);
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,
I'd like you to reexamine a change. Please visit
https://asterix-gerrit.ics.uci.edu/657
to look at the new patch set (#3).
Change subject: Introduce NC to NC Messaging
......................................................................
Introduce NC to NC Messaging
This change introduces NC to NC messaging. NC message
broker takes care of listening to NC message port and
sending messages to other NCs.
Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
---
A asterix-app/src/main/java/org/apache/asterix/messaging/MessageHandlerPool.java
A asterix-app/src/main/java/org/apache/asterix/messaging/MessageProcessor.java
M asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
M asterix-app/src/main/resources/asterix-build-configuration.xml
M asterix-app/src/main/resources/cluster.xml
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M asterix-common/src/main/resources/schema/asterix-conf.xsd
M asterix-common/src/main/resources/schema/cluster.xsd
M asterix-common/src/main/resources/schema/yarn_cluster.xsd
M asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
M asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
M asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/1node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/2node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/4node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/8node.xml
M asterix-external-data/pom.xml
M asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
M asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
M asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
M asterix-installer/src/main/resources/clusters/demo/demo.xml
M asterix-installer/src/main/resources/clusters/local/local.xml
M asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
M asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
M asterix-yarn/src/main/resources/configs/local.xml
M asterix-yarn/src/main/resources/configs/my_awesome_cluster_desc.xml
34 files changed, 718 insertions(+), 98 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/57/657/3
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Introduce NC to NC Messaging
......................................................................
Patch Set 4: Verified+1
Build Successful
https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1071/ : SUCCESS
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Introduce NC to NC Messaging
......................................................................
Patch Set 2:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1069/
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No
Change in asterixdb[master]: WIP(nc2nc messaging)
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: WIP(nc2nc messaging)
......................................................................
Patch Set 5:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1515/
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 5
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Ian Maxon <im...@apache.org>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "Ian Maxon (Code Review)" <do...@asterixdb.incubator.apache.org>.
Ian Maxon has posted comments on this change.
Change subject: Introduce NC to NC Messaging
......................................................................
Patch Set 4: Verified-1
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Ian Maxon <im...@apache.org>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Introduce NC to NC Messaging
......................................................................
Patch Set 3: Verified-1
Build Failed
https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1070/ : ABORTED
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Introduce NC to NC Messaging
......................................................................
Patch Set 1:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/870/
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Introduce NC to NC Messaging
......................................................................
Patch Set 4:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1071/
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,
I'd like you to reexamine a change. Please visit
https://asterix-gerrit.ics.uci.edu/657
to look at the new patch set (#4).
Change subject: Introduce NC to NC Messaging
......................................................................
Introduce NC to NC Messaging
This change introduces NC to NC messaging. NC message
broker takes care of listening to NC message port and
sending messages to other NCs.
Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
---
A asterix-app/src/main/java/org/apache/asterix/messaging/MessageHandlerPool.java
A asterix-app/src/main/java/org/apache/asterix/messaging/MessageProcessor.java
M asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
M asterix-app/src/main/resources/asterix-build-configuration.xml
M asterix-app/src/main/resources/cluster.xml
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M asterix-common/src/main/resources/schema/asterix-conf.xsd
M asterix-common/src/main/resources/schema/cluster.xsd
M asterix-common/src/main/resources/schema/yarn_cluster.xsd
M asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
M asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
M asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/1node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/2node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/4node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/8node.xml
M asterix-external-data/pom.xml
M asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
M asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
M asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
M asterix-installer/src/main/resources/clusters/demo/demo.xml
M asterix-installer/src/main/resources/clusters/local/local.xml
M asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
M asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
M asterix-yarn/src/main/resources/configs/local.xml
M asterix-yarn/src/main/resources/configs/my_awesome_cluster_desc.xml
34 files changed, 715 insertions(+), 98 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/57/657/4
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "Michael Blow (Code Review)" <do...@asterixdb.incubator.apache.org>.
Michael Blow has posted comments on this change.
Change subject: Introduce NC to NC Messaging
......................................................................
Patch Set 1:
(8 comments)
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java:
Line 62: cause.printStackTrace();
remove printStackTrace()
Line 80: future.cause().printStackTrace();
remove printStackTrace()
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java:
Line 41: System.err.println("channelRead Called");
remove println
Line 47: System.out.println(buf.readInt());
remove println
Line 54: cause.printStackTrace();
remove printStackTrace
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java:
Line 69: th.printStackTrace();
remove printStackTrace()
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-common/src/main/resources/schema/asterix-conf.xsd
File asterix-common/src/main/resources/schema/asterix-conf.xsd:
Line 62:
ws
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java:
Line 90: e.printStackTrace();
remove printStackTrace()
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-HasComments: Yes
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "Murtadha Hubail (Code Review)" <do...@asterixdb.incubator.apache.org>.
Murtadha Hubail has posted comments on this change.
Change subject: Introduce NC to NC Messaging
......................................................................
Patch Set 1:
(24 comments)
Remove topic.
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
File asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java:
Line 89: }
revert this file
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
File asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java:
Line 117: systemState = recoveryMgr.getSystemState();
revert this file
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java:
Line 31: System.err.println("NCMessageDecoder: decode called");
Use logger.
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java:
Line 62: cause.printStackTrace();
Use logger to print the exception message
Line 77: if (future.isSuccess()) {
use try-finally and put future.channel().close() in the finally block.
Line 80: future.cause().printStackTrace();
Use logger to print the exception message
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java:
Line 54: cause.printStackTrace();
Use logger to print the exception message
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java:
Line 32: private int port;
final
Line 33: private ChannelInboundHandlerAdapter handler = new NCMessagingHandler();
final
Line 34: private ChannelFuture f;
rename to channelFuture
Line 64: // In this example, this does not happen, but you can do that to gracefully
Remove comment after first line
Line 68: // Do something
remove comment
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/resources/cluster.xml
File asterix-app/src/main/resources/cluster.xml:
Line 35: <nc_messaging_port>4501</nc_messaging_port>
rename to messaging_port
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-common/src/main/resources/schema/asterix-conf.xsd
File asterix-common/src/main/resources/schema/asterix-conf.xsd:
Line 60: name="nc_messaging_port"
rename to messagingPort (not that I like it, but just to be consistent with the other names)
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
File asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java:
Line 27:
revert this file
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
File asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java:
Line 281: String nodeFeedPort;
rename to nodeMessagingPort
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/pom.xml
File asterix-external-data/pom.xml:
Line 291: <groupId>io.netty</groupId>
open a JIRA to add the library license if needed
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java:
Line 19: package org.apache.asterix.external.feed.management;
remove this file
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java:
Line 48: private FeedChannelManager feedChannelManager;
remove variable and its usage.
Line 90: e.printStackTrace();
why catch this exception? shouldn't it be illegalState to prevent the NC from completing its startup in an invalid state?
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java:
Line 20:
this class is not needed anymore, right?
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java:
Line 29:
this class is not needed anymore, right?
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
File asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java:
Line 213: }
you need to add the check here for messaging port if it isn't defined on both the node and the cluster level.
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
File asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java:
Line 1361: stores.add(new Store(node.getId(), "4501", nodeStores.toString()));
you need to update the yarn cluster file and pass the correct port.
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-HasComments: Yes
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Introduce NC to NC Messaging
......................................................................
Patch Set 1: Verified+1
Build Successful
https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/870/ : SUCCESS
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-HasComments: No
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,
I'd like you to reexamine a change. Please visit
https://asterix-gerrit.ics.uci.edu/657
to look at the new patch set (#2).
Change subject: Introduce NC to NC Messaging
......................................................................
Introduce NC to NC Messaging
This change introduces NC to NC messaging. NC message
broker takes care of listening to NC message port and
sending messages to other NCs.
Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
---
A asterix-app/src/main/java/org/apache/asterix/messaging/MessageHandlerPool.java
A asterix-app/src/main/java/org/apache/asterix/messaging/MessageProcessor.java
M asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
M asterix-app/src/main/resources/asterix-build-configuration.xml
M asterix-app/src/main/resources/cluster.xml
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M asterix-common/src/main/resources/schema/asterix-conf.xsd
M asterix-common/src/main/resources/schema/cluster.xsd
M asterix-common/src/main/resources/schema/yarn_cluster.xsd
M asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
M asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
M asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/1node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/2node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/4node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/8node.xml
M asterix-external-data/pom.xml
M asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
M asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
M asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
M asterix-installer/src/main/resources/clusters/demo/demo.xml
M asterix-installer/src/main/resources/clusters/local/local.xml
M asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
M asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
M asterix-yarn/src/main/resources/configs/local.xml
M asterix-yarn/src/main/resources/configs/my_awesome_cluster_desc.xml
35 files changed, 751 insertions(+), 98 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/57/657/2
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Change in asterixdb[master]: WIP(nc2nc messaging)
Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
abdullah alamoudi has abandoned this change.
Change subject: WIP(nc2nc messaging)
......................................................................
Abandoned
This change was moved to https://asterix-gerrit.ics.uci.edu/#/c/897/
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: abandon
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 5
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Ian Maxon <im...@apache.org>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "Ian Maxon (Code Review)" <do...@asterixdb.incubator.apache.org>.
Ian Maxon has posted comments on this change.
Change subject: Introduce NC to NC Messaging
......................................................................
Patch Set 4: -Verified
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Ian Maxon <im...@apache.org>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Introduce NC to NC Messaging
......................................................................
Patch Set 3:
Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1070/
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
abdullah alamoudi has posted comments on this change.
Change subject: Introduce NC to NC Messaging
......................................................................
Patch Set 1:
(32 comments)
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
File asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java:
Line 89: }
> revert this file
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
File asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java:
Line 117: systemState = recoveryMgr.getSystemState();
> revert this file
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java:
Line 31: System.err.println("NCMessageDecoder: decode called");
> Use logger.
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java:
Line 62: cause.printStackTrace();
> remove printStackTrace()
Done
Line 62: cause.printStackTrace();
> Use logger to print the exception message
Done
Line 77: if (future.isSuccess()) {
> use try-finally and put future.channel().close() in the finally block.
Done
Line 80: future.cause().printStackTrace();
> remove printStackTrace()
Done
Line 80: future.cause().printStackTrace();
> Use logger to print the exception message
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java:
Line 41: System.err.println("channelRead Called");
> remove println
Done
Line 47: System.out.println(buf.readInt());
> remove println
Done
Line 54: cause.printStackTrace();
> remove printStackTrace
Done
Line 54: cause.printStackTrace();
> Use logger to print the exception message
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java:
Line 32: private int port;
> final
Done
Line 33: private ChannelInboundHandlerAdapter handler = new NCMessagingHandler();
> final
Done
Line 34: private ChannelFuture f;
> rename to channelFuture
Done
Line 64: // In this example, this does not happen, but you can do that to gracefully
> Remove comment after first line
Done
Line 68: // Do something
> remove comment
Done
Line 69: th.printStackTrace();
> remove printStackTrace()
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/resources/cluster.xml
File asterix-app/src/main/resources/cluster.xml:
Line 35: <nc_messaging_port>4501</nc_messaging_port>
> rename to messaging_port
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-common/src/main/resources/schema/asterix-conf.xsd
File asterix-common/src/main/resources/schema/asterix-conf.xsd:
Line 60: name="nc_messaging_port"
> rename to messagingPort (not that I like it, but just to be consistent with
Done
Line 62:
> ws
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
File asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java:
Line 27:
> revert this file
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
File asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java:
Line 281: String nodeFeedPort;
> rename to nodeMessagingPort
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/pom.xml
File asterix-external-data/pom.xml:
Line 291: <groupId>io.netty</groupId>
> open a JIRA to add the library license if needed
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java:
Line 19: package org.apache.asterix.external.feed.management;
> remove this file
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java:
Line 48: private FeedChannelManager feedChannelManager;
> remove variable and its usage.
Done
Line 90: e.printStackTrace();
> why catch this exception? shouldn't it be illegalState to prevent the NC fr
Done
Line 90: e.printStackTrace();
> remove printStackTrace()
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java:
Line 20:
> this class is not needed anymore, right?
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java:
Line 29:
> this class is not needed anymore, right?
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
File asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java:
Line 213: }
> you need to add the check here for messaging port if it isn't defined on bo
Done
https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
File asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java:
Line 1361: stores.add(new Store(node.getId(), "4501", nodeStores.toString()));
> you need to update the yarn cluster file and pass the correct port.
Done
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: Yes
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
abdullah alamoudi has posted comments on this change.
Change subject: Introduce NC to NC Messaging
......................................................................
Patch Set 4:
Guys,
Please, look at this change and let's get it in.
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No
Change in asterixdb[master]: Introduce NC to NC Messaging
Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.
Change subject: Introduce NC to NC Messaging
......................................................................
Patch Set 2: Verified-1
Build Failed
https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1069/ : ABORTED
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No
Change in asterixdb[master]: WIP(nc2nc messaging)
Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,
I'd like you to reexamine a change. Please visit
https://asterix-gerrit.ics.uci.edu/657
to look at the new patch set (#5).
Change subject: WIP(nc2nc messaging)
......................................................................
WIP(nc2nc messaging)
Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
M asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
M asterixdb/asterix-app/src/main/resources/cluster.xml
M asterixdb/asterix-common/pom.xml
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/MessageHandlerPool.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/MessageProcessor.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/NCMessagingClient.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/NCMessagingClientHandler.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/NCMessagingHandler.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/NCMessagingServer.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M asterixdb/asterix-common/src/main/resources/schema/asterix-conf.xsd
M asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
M asterixdb/asterix-common/src/main/resources/schema/yarn_cluster.xsd
M asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
M asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
M asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
M asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
M asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
M hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
M hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
40 files changed, 1,021 insertions(+), 155 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/57/657/5
--
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 5
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Ian Maxon <im...@apache.org>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>