You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org> on 2016/02/23 16:14:15 UTC

Change in asterixdb[master]: Introduce NC to NC Messaging

abdullah alamoudi has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/657

Change subject: Introduce NC to NC Messaging
......................................................................

Introduce NC to NC Messaging

This change introduces NC to NC messaging. NC message
broker takes care of listening to NC message port and
sending messages to other NCs.

Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
---
M asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
M asterix-app/src/main/resources/asterix-build-configuration.xml
A asterix-app/src/main/resources/cluster.xml
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M asterix-common/src/main/resources/schema/asterix-conf.xsd
M asterix-common/src/main/resources/schema/cluster.xsd
M asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
M asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
M asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
M asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
M asterix-external-data/pom.xml
A asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
M asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
A asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
A asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
M asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
M asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
M asterix-installer/src/main/resources/clusters/demo/demo.xml
M asterix-installer/src/main/resources/clusters/local/local.xml
M asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
M asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
M asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
33 files changed, 617 insertions(+), 59 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/57/657/1

diff --git a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index e33aed2..e3493c6 100644
--- a/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ b/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -87,7 +87,7 @@
             if (tempPath.endsWith(File.separator)) {
                 tempPath = tempPath.substring(0, tempPath.length() - 1);
             }
-            //get initial partitions from properties
+            // get initial partitions from properties
             String[] nodeStores = propertiesAccessor.getStores().get(ncName);
             if (nodeStores == null) {
                 throw new Exception("Coudn't find stores for NC: " + ncName);
@@ -97,7 +97,7 @@
                 tempDirPath += File.separator;
             }
             for (int p = 0; p < nodeStores.length; p++) {
-                //create IO devices based on stores
+                // create IO devices based on stores
                 String iodevicePath = tempDirPath + ncConfig1.nodeId + File.separator + nodeStores[p];
                 File ioDeviceDir = new File(iodevicePath);
                 ioDeviceDir.mkdirs();
diff --git a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
index 4922ae6..58523f4 100644
--- a/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
+++ b/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
@@ -122,7 +122,7 @@
 
             if (replicationEnabled) {
                 if (systemState == SystemState.NEW_UNIVERSE || systemState == SystemState.CORRUPTED) {
-                    //Try to perform remote recovery
+                    // Try to perform remote recovery
                     IRemoteRecoveryManager remoteRecoveryMgr = runtimeContext.getRemoteRecoveryManager();
                     if (autoFailover) {
                         remoteRecoveryMgr.startFailbackProcess();
@@ -153,13 +153,13 @@
     }
 
     private void startReplicationService() {
-        //Open replication channel
+        // Open replication channel
         runtimeContext.getReplicationChannel().start();
 
-        //Check the state of remote replicas
+        // Check the state of remote replicas
         runtimeContext.getReplicationManager().initializeReplicasState();
 
-        //Start replication after the state of remote replicas has been initialized.
+        // Start replication after the state of remote replicas has been initialized.
         runtimeContext.getReplicationManager().startReplicationThreads();
     }
 
@@ -176,10 +176,10 @@
                 MetadataBootstrap.stopUniverse();
             }
 
-            //Clean any temporary files
+            // Clean any temporary files
             performLocalCleanUp();
 
-            //Note: stopping recovery manager will make a sharp checkpoint
+            // Note: stopping recovery manager will make a sharp checkpoint
             ncApplicationContext.getLifeCycleComponentManager().stopAll(false);
             runtimeContext.deinitialize();
         } else {
@@ -191,7 +191,7 @@
 
     @Override
     public void notifyStartupComplete() throws Exception {
-        //Send max resource id on this NC to the CC
+        // Send max resource id on this NC to the CC
         ((INCMessageBroker) ncApplicationContext.getMessageBroker()).reportMaxResourceId();
 
         AsterixMetadataProperties metadataProperties = ((IAsterixPropertiesProvider) runtimeContext)
@@ -245,15 +245,15 @@
             }
         }
 
-        //Clean any temporary files
+        // Clean any temporary files
         performLocalCleanUp();
     }
 
     private void performLocalCleanUp() {
-        //Delete working area files from failed jobs
+        // Delete working area files from failed jobs
         runtimeContext.getIOManager().deleteWorkspaceFiles();
 
-        //Reclaim storage for temporary datasets.
+        // Reclaim storage for temporary datasets.
         String storageDirName = AsterixClusterProperties.INSTANCE.getStorageDirectoryName();
         String[] ioDevices = ((PersistentLocalResourceRepository) runtimeContext.getLocalResourceRepository())
                 .getStorageMountingPoints();
@@ -263,9 +263,9 @@
             FileUtils.deleteQuietly(new File(tempDatasetsDir));
         }
 
-        //TODO
-        //Reclaim storage for orphaned index artifacts in NCs.
-        //Note: currently LSM indexes invalid components are deleted when an index is activated.
+        // TODO
+        // Reclaim storage for orphaned index artifacts in NCs.
+        // Note: currently LSM indexes invalid components are deleted when an index is activated.
     }
 
     private void updateOnNodeJoin() {
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
index 0a0a917..de3848e 100644
--- a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
@@ -24,7 +24,7 @@
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
+import org.apache.asterix.api.common.AsterixAppRuntimeContext;
 import org.apache.asterix.common.messaging.AbstractApplicationMessage;
 import org.apache.asterix.common.messaging.CompleteFailbackRequestMessage;
 import org.apache.asterix.common.messaging.CompleteFailbackResponseMessage;
@@ -43,6 +43,7 @@
 import org.apache.asterix.common.replication.ReplicaEvent;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.metadata.bootstrap.MetadataIndexImmutableProperties;
+import org.apache.asterix.om.util.AsterixClusterProperties;
 import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.api.util.JavaSerializationUtils;
@@ -54,16 +55,21 @@
     private final NodeControllerService ncs;
     private final AtomicLong messageId = new AtomicLong(0);
     private final Map<Long, IApplicationMessageCallback> callbacks;
-    private final IAsterixAppRuntimeContext appContext;
+    private final NCMessagingServer messageServer;
+
+    private final AsterixAppRuntimeContext appContext;
 
     public NCMessageBroker(NodeControllerService ncs) {
         this.ncs = ncs;
-        appContext = (IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
         callbacks = new ConcurrentHashMap<Long, IApplicationMessageCallback>();
+        // Start listening to NC messages
+        appContext = (AsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
+        messageServer = new NCMessagingServer(appContext.getMetadataProperties().getNodeMessagingPort(ncs.getId()));
+        messageServer.start();
     }
 
     @Override
-    public void sendMessage(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception {
+    public void sendMessageToCC(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception {
         if (callback != null) {
             long uniqueMessageId = messageId.incrementAndGet();
             message.setId(uniqueMessageId);
@@ -73,7 +79,7 @@
             ncs.sendApplicationMessageToCC(JavaSerializationUtils.serialize(message), null);
         } catch (Exception e) {
             if (callback != null) {
-                //remove the callback in case of failure
+                // remove the callback in case of failure
                 callbacks.remove(message.getId());
             }
             throw e;
@@ -131,7 +137,7 @@
             //send response after takeover is completed
             TakeoverPartitionsResponseMessage reponse = new TakeoverPartitionsResponseMessage(msg.getRequestId(),
                     appContext.getTransactionSubsystem().getId(), msg.getPartitions());
-            sendMessage(reponse, null);
+            sendMessageToCC(reponse, null);
         }
     }
 
@@ -142,18 +148,18 @@
         } finally {
             TakeoverMetadataNodeResponseMessage reponse = new TakeoverMetadataNodeResponseMessage(
                     appContext.getTransactionSubsystem().getId());
-            sendMessage(reponse, null);
+            sendMessageToCC(reponse, null);
         }
     }
 
     @Override
     public void reportMaxResourceId() throws Exception {
         ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage();
-        //resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for metadata indexes.
+        // resource ids < FIRST_AVAILABLE_USER_DATASET_ID are reserved for metadata indexes.
         long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
                 MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
         maxResourceIdMsg.setMaxResourceId(maxResourceId);
-        sendMessage(maxResourceIdMsg, null);
+        sendMessageToCC(maxResourceIdMsg, null);
     }
 
     private void handleReplicaEvent(IMessage message) {
@@ -195,7 +201,7 @@
         //send response after partitions prepared for failback
         PreparePartitionsFailbackResponseMessage reponse = new PreparePartitionsFailbackResponseMessage(msg.getPlanId(),
                 msg.getRequestId(), msg.getPartitions());
-        sendMessage(reponse, null);
+        sendMessageToCC(reponse, null);
     }
 
     private void handleCompleteFailbackRequest(IMessage message) throws Exception {
@@ -206,7 +212,15 @@
         } finally {
             CompleteFailbackResponseMessage reponse = new CompleteFailbackResponseMessage(msg.getPlanId(),
                     msg.getRequestId(), msg.getPartitions());
-            sendMessage(reponse, null);
+            sendMessageToCC(reponse, null);
         }
     }
+
+    @Override
+    public void sendMessageToNC(String to, IApplicationMessage message, IApplicationMessageCallback callback)
+            throws Exception {
+        NCMessagingClient client = new NCMessagingClient(AsterixClusterProperties.INSTANCE.getNodeIpAddress(to),
+                appContext.getMetadataProperties().getNodeMessagingPort(ncs.getId()), 4);
+        client.start();
+    }
 }
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
new file mode 100644
index 0000000..e34a0f3
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ReplayingDecoder;
+
+public class NCMessageDecoder extends ReplayingDecoder<Void> {
+
+    @Override
+    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
+        System.err.println("NCMessageDecoder: decode called");
+        out.add(in.readBytes(4));
+    }
+}
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
new file mode 100644
index 0000000..63eb888
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import javax.net.ssl.SSLException;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+public class NCMessagingClient {
+    private final String host;
+    private final int port;
+    private final int size;
+
+    public NCMessagingClient(String host, int port, int size) {
+        this.host = host;
+        this.port = port;
+        this.size = size;
+    }
+
+    public void start() throws SSLException, InterruptedException {
+        EventLoopGroup group = new NioEventLoopGroup();
+        try {
+            Bootstrap b = new Bootstrap();
+            b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
+                @Override
+                protected void initChannel(SocketChannel ch) throws Exception {
+                    ChannelPipeline p = ch.pipeline();
+                    p.addLast(new NCMessagingClientHandler(size));
+                }
+            });
+
+            // Make the connection attempt.
+            ChannelFuture f = b.connect(host, port).sync();
+
+            // Wait until the connection is closed.
+            f.channel().closeFuture().sync();
+        } finally {
+            group.shutdownGracefully();
+        }
+    }
+}
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
new file mode 100644
index 0000000..40dd153
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+public class NCMessagingClientHandler extends SimpleChannelInboundHandler<Object> {
+
+    private ByteBuf content;
+    private ChannelHandlerContext ctx;
+    private int size;
+
+    public NCMessagingClientHandler(int size) {
+        this.size = size;
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) {
+        this.ctx = ctx;
+
+        // Initialize the message.
+        content = ctx.alloc().directBuffer(size);
+        content.writeInt(10);
+
+        // Send the initial messages.
+        generateTraffic();
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        content.release();
+    }
+
+    @Override
+    public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
+        // Server is supposed to send nothing, but if it sends something, discard it.
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        // Close the connection when an exception is raised.
+        cause.printStackTrace();
+        ctx.close();
+    }
+
+    long counter;
+
+    private void generateTraffic() {
+        // Flush the outbound buffer to the socket.
+        // Once flushed, generate the same amount of traffic again.
+        ctx.writeAndFlush(content.duplicate().retain()).addListener(trafficGenerator);
+    }
+
+    private final ChannelFutureListener trafficGenerator = new ChannelFutureListener() {
+        @Override
+        public void operationComplete(ChannelFuture future) {
+            if (future.isSuccess()) {
+                future.channel().close();
+            } else {
+                future.cause().printStackTrace();
+                future.channel().close();
+            }
+        }
+    };
+
+}
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
new file mode 100644
index 0000000..3cb6aaa
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+public class NCMessagingHandler extends ChannelInboundHandlerAdapter {
+    private ByteBuf buf;
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) {
+        buf = ctx.alloc().buffer(4);
+    }
+
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) {
+        buf.release();
+        buf = null;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
+        System.err.println("channelRead Called");
+        ByteBuf m = (ByteBuf) msg;
+        buf.writeBytes(m);
+        m.release();
+
+        if (buf.readableBytes() >= 4) {
+            System.out.println(buf.readInt());
+            ctx.close();
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        cause.printStackTrace();
+        ctx.close();
+    }
+}
diff --git a/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
new file mode 100644
index 0000000..dbf4c0e
--- /dev/null
+++ b/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.messaging;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+
+public class NCMessagingServer {
+    private int port;
+    private ChannelInboundHandlerAdapter handler = new NCMessagingHandler();
+    private ChannelFuture f;
+
+    public NCMessagingServer(int port) {
+        this.port = port;
+    }
+
+    public void stop() {
+        f.channel().close();
+    }
+
+    public void start() {
+        Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                EventLoopGroup bossGroup = new NioEventLoopGroup();
+                EventLoopGroup workerGroup = new NioEventLoopGroup();
+                try {
+                    ServerBootstrap b = new ServerBootstrap();
+                    b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+                            .childHandler(new ChannelInitializer<SocketChannel>() {
+                        @Override
+                        public void initChannel(SocketChannel ch) throws Exception {
+                            ch.pipeline().addLast(new NCMessageDecoder(), new NCMessagingHandler());
+                        }
+                    }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
+
+                    // Bind and start to accept incoming connections.
+                    ChannelFuture f = b.bind(port).sync();
+
+                    // Wait until the server socket is closed.
+                    // In this example, this does not happen, but you can do that to gracefully
+                    // shut down your server.
+                    f.channel().closeFuture().sync();
+                } catch (Throwable th) {
+                    // Do something
+                    th.printStackTrace();
+                } finally {
+                    workerGroup.shutdownGracefully();
+                    bossGroup.shutdownGracefully();
+                }
+            }
+        });
+        thread.setName("NCMessagingService:" + port);
+        thread.start();
+    }
+}
diff --git a/asterix-app/src/main/resources/asterix-build-configuration.xml b/asterix-app/src/main/resources/asterix-build-configuration.xml
index 731113b..2dd5cfc 100644
--- a/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -20,10 +20,12 @@
 	<metadataNode>asterix_nc1</metadataNode>
 	<store>
 		<ncId>asterix_nc1</ncId>
+        <nc_messaging_port>4501</nc_messaging_port>
 		<storeDirs>iodevice0,iodevice1</storeDirs>
 	</store>
 	<store>
 		<ncId>asterix_nc2</ncId>
+        <nc_messaging_port>4502</nc_messaging_port>
 		<storeDirs>iodevice0,iodevice1</storeDirs>
 	</store>
 	<transactionLogDir>
diff --git a/asterix-app/src/main/resources/cluster.xml b/asterix-app/src/main/resources/cluster.xml
new file mode 100644
index 0000000..7c7e321
--- /dev/null
+++ b/asterix-app/src/main/resources/cluster.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<cluster xmlns="cluster">
+    <instance_name>asterix</instance_name>
+    <store>storage</store>
+    <master_node>
+        <id>master</id>
+        <client_ip>127.0.0.1</client_ip>
+        <cluster_ip>127.0.0.1</cluster_ip>
+        <client_port>1098</client_port>
+        <cluster_port>1099</cluster_port>
+        <http_port>8888</http_port>
+    </master_node>
+    <nc_messaging_port>4503</nc_messaging_port>
+    <node>
+        <id>nc1</id>
+        <cluster_ip>127.0.0.1</cluster_ip>
+        <nc_messaging_port>4501</nc_messaging_port>
+    </node>
+    <node>
+        <id>nc2</id>
+        <cluster_ip>127.0.0.1</cluster_ip>
+        <nc_messaging_port>4502</nc_messaging_port>
+    </node>
+</cluster>
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
index 473a163..289a8ed 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
@@ -69,4 +69,8 @@
     public Map<String, String> getTransactionLogDirs() {
         return accessor.getTransactionLogDirs();
     }
+
+    public int getNodeMessagingPort(String nodeId) {
+        return accessor.getNodeMessagingPort(nodeId);
+    }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
index 13ce403..3c31936 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
@@ -57,7 +57,8 @@
     private final Map<String, String> transactionLogDirs;
     private final Map<String, String> asterixBuildProperties;
     private final Map<String, ClusterPartition[]> nodePartitionsMap;
-    private SortedMap<Integer, ClusterPartition> clusterPartitions;
+    private final SortedMap<Integer, ClusterPartition> clusterPartitions;
+    private final Map<String, Integer> node2NCMessagingPort;
 
     public AsterixPropertiesAccessor() throws AsterixException {
         String fileName = System.getProperty(GlobalConfig.CONFIG_FILE_PROPERTY);
@@ -86,7 +87,9 @@
         instanceName = asterixConfiguration.getInstanceName();
         metadataNodeName = asterixConfiguration.getMetadataNode();
         stores = new HashMap<String, String[]>();
+        node2NCMessagingPort = new HashMap<>();
         List<Store> configuredStores = asterixConfiguration.getStore();
+
         nodeNames = new HashSet<String>();
         nodePartitionsMap = new HashMap<>();
         clusterPartitions = new TreeMap<>();
@@ -103,6 +106,7 @@
             stores.put(store.getNcId(), nodeStores);
             nodePartitionsMap.put(store.getNcId(), nodePartitions);
             nodeNames.add(store.getNcId());
+            node2NCMessagingPort.put(store.getNcId(), Integer.parseInt(store.getNcMessagingPort()));
         }
         asterixConfigurationParams = new HashMap<String, Property>();
         for (Property p : asterixConfiguration.getProperty()) {
@@ -208,4 +212,8 @@
     public SortedMap<Integer, ClusterPartition> getClusterPartitions() {
         return clusterPartitions;
     }
+
+    public int getNodeMessagingPort(String nodeId) {
+        return node2NCMessagingPort.get(nodeId);
+    }
 }
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
index 5d2e263..95fe37b 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
@@ -42,7 +42,7 @@
 
     /**
      * Sets a unique message id that identifies this message within an NC.
-     * This id is set by {@link INCMessageBroker#sendMessage(IApplicationMessage, IApplicationMessageCallback)}
+     * This id is set by {@link INCMessageBroker#sendMessageToCC(IApplicationMessage, IApplicationMessageCallback)}
      * when the callback is not null to notify the sender when the response to that message is received.
      *
      * @param messageId
diff --git a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
index 41f8a0c..724ed04 100644
--- a/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
+++ b/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
@@ -29,7 +29,17 @@
      * @param callback
      * @throws Exception
      */
-    public void sendMessage(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception;
+    public void sendMessageToCC(IApplicationMessage message, IApplicationMessageCallback callback) throws Exception;
+
+    /**
+     * Sends application message from this NC to another NC.
+     *
+     * @param message
+     * @param callback
+     * @throws Exception
+     */
+    public void sendMessageToNC(String to, IApplicationMessage message, IApplicationMessageCallback callback)
+            throws Exception;
 
     /**
      * Sends the maximum resource id on this NC to the CC.
diff --git a/asterix-common/src/main/resources/schema/asterix-conf.xsd b/asterix-common/src/main/resources/schema/asterix-conf.xsd
index bb99319..3a66421 100644
--- a/asterix-common/src/main/resources/schema/asterix-conf.xsd
+++ b/asterix-common/src/main/resources/schema/asterix-conf.xsd
@@ -56,12 +56,16 @@
     <xs:element
         name="txnLogDirPath"
         type="xs:string" />
-
+    <xs:element
+        name="nc_messaging_port"
+        type="xs:string" />
+        
     <!-- definition of complex elements -->
     <xs:element name="store">
         <xs:complexType>
             <xs:sequence>
                 <xs:element ref="mg:ncId" />
+                <xs:element ref="mg:nc_messaging_port" />
                 <xs:element ref="mg:storeDirs" />
             </xs:sequence>
         </xs:complexType>
diff --git a/asterix-common/src/main/resources/schema/cluster.xsd b/asterix-common/src/main/resources/schema/cluster.xsd
index 935d33f..1ec9a64 100644
--- a/asterix-common/src/main/resources/schema/cluster.xsd
+++ b/asterix-common/src/main/resources/schema/cluster.xsd
@@ -57,6 +57,7 @@
     <xs:element name="result_time_to_live" type="xs:long" />
     <xs:element name="result_sweep_threshold" type="xs:long" />
     <xs:element name="cc_root" type="xs:string" />
+    <xs:element name="nc_messaging_port" type="xs:integer" />
 
 	<!-- definition of complex elements -->
 	<xs:element name="working_dir">
@@ -123,6 +124,7 @@
 				<xs:element ref="cl:txn_log_dir" minOccurs="0" />
 				<xs:element ref="cl:iodevices" minOccurs="0" />
 				<xs:element ref="cl:debug_port" minOccurs="0" />
+                <xs:element ref="cl:nc_messaging_port" minOccurs="0" />
 			</xs:sequence>
 		</xs:complexType>
 	</xs:element>
@@ -151,6 +153,7 @@
 				<xs:element ref="cl:metadata_node" />
 				<xs:element ref="cl:data_replication" minOccurs="0" />
 				<xs:element ref="cl:master_node" />
+                <xs:element ref="cl:nc_messaging_port" minOccurs="0" />
 				<xs:element ref="cl:node" maxOccurs="unbounded" />
 				<xs:element ref="cl:substitute_nodes" />
                 <xs:element ref="cl:heartbeat_period" minOccurs="0" />
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
index 29765fd..c92262c 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
@@ -41,7 +41,7 @@
 public class EventDriver {
 
     public static final String CLIENT_NODE_ID = "client_node";
-    public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null);
+    public static final Node CLIENT_NODE = new Node(CLIENT_NODE_ID, "127.0.0.1", null, null, null, null, null, null);
 
     private static String eventsDir;
     private static Events events;
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java b/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
index e0e5bc4..b8c7f7c 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
@@ -25,13 +25,12 @@
 import java.util.Timer;
 import java.util.TimerTask;
 
-import org.apache.log4j.Logger;
-
 import org.apache.asterix.event.driver.EventDriver;
 import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.event.schema.event.Event;
 import org.apache.asterix.event.schema.pattern.Pattern;
 import org.apache.asterix.event.schema.pattern.Period;
+import org.apache.log4j.Logger;
 
 public class EventTask extends TimerTask {
 
@@ -68,8 +67,8 @@
             this.interval = EventUtil.parseTimeInterval(period.getAbsvalue(), period.getUnit());
         }
         if (pattern.getDelay() != null) {
-            this.initialDelay = EventUtil.parseTimeInterval(new ValueType(pattern.getDelay().getValue()), pattern
-                    .getDelay().getUnit());
+            this.initialDelay = EventUtil.parseTimeInterval(new ValueType(pattern.getDelay().getValue()),
+                    pattern.getDelay().getUnit());
         }
         if (pattern.getMaxOccurs() != null) {
             this.maxOccurs = pattern.getMaxOccurs();
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
index b83faa2..c764cd7 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
@@ -186,12 +186,12 @@
         }
 
         if (nodeid.equals(cluster.getMasterNode().getId())) {
-            String logDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir() : cluster.getMasterNode()
-                    .getLogDir();
-            String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome() : cluster
-                    .getMasterNode().getJavaHome();
+            String logDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir()
+                    : cluster.getMasterNode().getLogDir();
+            String javaHome = cluster.getMasterNode().getJavaHome() == null ? cluster.getJavaHome()
+                    : cluster.getMasterNode().getJavaHome();
             return new Node(cluster.getMasterNode().getId(), cluster.getMasterNode().getClusterIp(), javaHome, logDir,
-                    null, null, cluster.getMasterNode().getDebugPort());
+                    null, null, cluster.getMasterNode().getDebugPort(), null);
         }
 
         List<Node> nodeList = cluster.getNode();
@@ -231,8 +231,8 @@
         pb.start();
     }
 
-    public static void executeLocalScript(Node node, String script, List<String> args) throws IOException,
-            InterruptedException {
+    public static void executeLocalScript(Node node, String script, List<String> args)
+            throws IOException, InterruptedException {
         List<String> pargs = new ArrayList<String>();
         pargs.add("/bin/bash");
         pargs.add(script);
diff --git a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
index ecbafa7..0f4c9f5 100644
--- a/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
+++ b/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
@@ -278,6 +278,7 @@
         configuration.setMetadataNode(asterixInstanceName + "_" + metadataNodeId);
         List<Store> stores = new ArrayList<Store>();
         String storeDir = cluster.getStore().trim();
+        String nodeFeedPort;
         for (Node node : cluster.getNode()) {
             String iodevices = node.getIodevices() == null ? cluster.getIodevices() : node.getIodevices();
             String[] nodeIdDevice = iodevices.split(",");
@@ -287,7 +288,9 @@
             }
             //remove last comma
             nodeStores.deleteCharAt(nodeStores.length() - 1);
-            stores.add(new Store(asterixInstanceName + "_" + node.getId(), nodeStores.toString()));
+            nodeFeedPort = node.getNcMessagingPort() == null ? String.valueOf(cluster.getNcMessagingPort().intValue())
+                    : String.valueOf(node.getNcMessagingPort().intValue());
+            stores.add(new Store(asterixInstanceName + "_" + node.getId(), nodeFeedPort, nodeStores.toString()));
         }
         configuration.setStore(stores);
         List<Coredump> coredump = new ArrayList<Coredump>();
@@ -354,6 +357,7 @@
 
     private static void zipDir(File sourceDir, final File destFile, ZipOutputStream zos) throws IOException {
         File[] dirList = sourceDir.listFiles(new FileFilter() {
+            @Override
             public boolean accept(File f) {
                 return !f.getName().endsWith(destFile.getName());
             }
@@ -391,7 +395,7 @@
         byte[] buffer = new byte[2048];
         int read;
         while (entries.hasMoreElements()) {
-            JarEntry entry = (JarEntry) entries.nextElement();
+            JarEntry entry = entries.nextElement();
             String name = entry.getName();
             if (name.equals(origFile)) {
                 continue;
diff --git a/asterix-external-data/pom.xml b/asterix-external-data/pom.xml
index 7801fd7..38b4824 100644
--- a/asterix-external-data/pom.xml
+++ b/asterix-external-data/pom.xml
@@ -287,5 +287,10 @@
             <artifactId>rxjava</artifactId>
             <version>1.0.15</version>
         </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>4.0.33.Final</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
new file mode 100644
index 0000000..7796082
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.management;
+
+import org.apache.asterix.common.config.AsterixFeedProperties;
+
+public class FeedChannelManager extends Thread {
+
+    public FeedChannelManager(AsterixFeedProperties feedProperties, String nodeId) {
+    }
+
+    @Override
+    public void run() {
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
index 5095e7d..3965c6b 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
@@ -45,6 +45,8 @@
 
     private static final Logger LOGGER = Logger.getLogger(FeedManager.class.getName());
 
+    private FeedChannelManager feedChannelManager;
+
     private final IFeedSubscriptionManager feedSubscriptionManager;
 
     private final IFeedConnectionManager feedConnectionManager;
@@ -76,9 +78,11 @@
                 ? AsterixClusterProperties.INSTANCE.getCluster().getMasterNode().getClusterIp() : "localhost";
         this.feedMessageService = new FeedMessageService(feedProperties, nodeId, ccClusterIp);
         this.nodeLoadReportService = new NodeLoadReportService(nodeId, this);
+        this.feedChannelManager = new FeedChannelManager(feedProperties, nodeId);
         try {
             this.feedMessageService.start();
             this.nodeLoadReportService.start();
+            this.feedChannelManager.start();
         } catch (Exception e) {
             if (LOGGER.isLoggable(Level.WARNING)) {
                 LOGGER.warning("Unable to start feed services " + e.getMessage());
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
new file mode 100644
index 0000000..22c9a94
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+public class FeedMessageHandler extends ChannelInboundHandlerAdapter {
+
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
new file mode 100644
index 0000000..171b925
--- /dev/null
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.feed.message;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+
+public class FeedMessageServer {
+    private final int port;
+    private final int numberOfChannels;
+
+    public FeedMessageServer(int port, int numOfClusterNodes) {
+        this.port = port;
+        this.numberOfChannels = numOfClusterNodes;
+    }
+
+    public void start() throws Exception {
+        EventLoopGroup bossGroup = new NioEventLoopGroup();
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+        try {
+            ServerBootstrap b = new ServerBootstrap();
+            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
+                    .childHandler(new ChannelInitializer<SocketChannel>() {
+                        @Override
+                        public void initChannel(SocketChannel ch) throws Exception {
+                            ch.pipeline().addLast(new FeedMessageHandler());
+                        }
+                    }).option(ChannelOption.SO_BACKLOG, numberOfChannels).childOption(ChannelOption.SO_KEEPALIVE, true);
+            ChannelFuture f = b.bind(port).sync();
+            f.channel().closeFuture().sync();
+        } finally {
+            workerGroup.shutdownGracefully();
+            bossGroup.shutdownGracefully();
+        }
+    }
+}
diff --git a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
index 870a6df..bb78233 100644
--- a/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
+++ b/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
@@ -38,10 +38,9 @@
 import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.HyracksException;
-import org.apache.hyracks.hdfs.scheduler.Scheduler;
 
 public class IndexingScheduler {
-    private static final Logger LOGGER = Logger.getLogger(Scheduler.class.getName());
+    private static final Logger LOGGER = Logger.getLogger(IndexingScheduler.class.getName());
 
     /** a list of NCs */
     private String[] NCs;
diff --git a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
index 9559394..4037eaf 100644
--- a/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
+++ b/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
@@ -105,7 +105,7 @@
 
             MasterNode masterNode = cluster.getMasterNode();
             Node master = new Node(masterNode.getId(), masterNode.getClusterIp(), masterNode.getJavaHome(),
-                    masterNode.getLogDir(), null, null, null);
+                    masterNode.getLogDir(), null, null, null, null);
             ipAddresses.add(masterNode.getClusterIp());
 
             valid = valid & validateNodeConfiguration(master, cluster);
diff --git a/asterix-installer/src/main/resources/clusters/demo/demo.xml b/asterix-installer/src/main/resources/clusters/demo/demo.xml
index 1932fcc..8d5f13b 100644
--- a/asterix-installer/src/main/resources/clusters/demo/demo.xml
+++ b/asterix-installer/src/main/resources/clusters/demo/demo.xml
@@ -32,6 +32,7 @@
 	<log_dir>/tmp/asterix/logs</log_dir>
 	<store>storage</store>
 	<java_home></java_home>
+    <nc_messaging_port>5403</nc_messaging_port>
 	<master_node>
 		<id>master</id>
 		<client_ip>127.0.0.1</client_ip>
@@ -45,11 +46,13 @@
 		<cluster_ip>127.0.0.1</cluster_ip>
 		<txn_log_dir>/tmp/asterix/node1/txnLogs</txn_log_dir>
 		<iodevices>/tmp/asterix/node1/1,/tmp/asterix/node1/2</iodevices>
+        <nc_messaging_port>5401</nc_messaging_port>
 	</node>
 	<node>
 		<id>node2</id>
 		<cluster_ip>127.0.0.1</cluster_ip>
 		<txn_log_dir>/tmp/asterix/node2/txnLogs</txn_log_dir>
 		<iodevices>/tmp/asterix/node2/1,/tmp/asterix/node2/2</iodevices>
+        <nc_messaging_port>5402</nc_messaging_port>
 	</node>
 </cluster>
diff --git a/asterix-installer/src/main/resources/clusters/local/local.xml b/asterix-installer/src/main/resources/clusters/local/local.xml
index 20f697f..7ed8b74 100644
--- a/asterix-installer/src/main/resources/clusters/local/local.xml
+++ b/asterix-installer/src/main/resources/clusters/local/local.xml
@@ -40,6 +40,7 @@
     <result_time_to_live>86400000</result_time_to_live>
     <!-- The duration within which an instance of the result cleanup should be invoked in milliseconds. (default: 1 minute) -->
     <result_sweep_threshold>60000</result_sweep_threshold>
+    <nc_messaging_port>5403</nc_messaging_port>
     <master_node>
         <id>master</id>
         <client_ip>127.0.0.1</client_ip>
@@ -53,12 +54,13 @@
         <cluster_ip>127.0.0.1</cluster_ip>
         <txn_log_dir>/tmp/asterix/nc1/txnLogs</txn_log_dir>
         <iodevices>/tmp/asterix/nc1/p1,/tmp/asterix/nc1/p2</iodevices>
-
+        <nc_messaging_port>5401</nc_messaging_port>
     </node>
     <node>
         <id>nc2</id>
         <cluster_ip>127.0.0.1</cluster_ip>
         <txn_log_dir>/tmp/asterix/nc2/txnLogs</txn_log_dir>
         <iodevices>/tmp/asterix/nc2/p1,/tmp/asterix/nc2/p2</iodevices>
+        <nc_messaging_port>5402</nc_messaging_port>
     </node>
 </cluster>
diff --git a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
index 3e37694..e7bf3bf 100644
--- a/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
+++ b/asterix-metadata/src/main/java/org/apache/asterix/metadata/cluster/ClusterManager.java
@@ -62,9 +62,10 @@
 
     private ClusterManager() {
         Cluster asterixCluster = AsterixClusterProperties.INSTANCE.getCluster();
-        String eventHome = asterixCluster == null ? null : asterixCluster.getWorkingDir().getDir();
+        String eventHome = asterixCluster == null ? null
+                : asterixCluster.getWorkingDir() == null ? null : asterixCluster.getWorkingDir().getDir();
 
-        if (asterixCluster != null) {
+        if (eventHome != null) {
             String asterixDir = System.getProperty("user.dir") + File.separator + "asterix";
             File configFile = new File(System.getProperty("user.dir") + File.separator + "configuration.xml");
             Configuration configuration = null;
@@ -74,8 +75,8 @@
                 Unmarshaller unmarshaller = configCtx.createUnmarshaller();
                 configuration = (Configuration) unmarshaller.unmarshal(configFile);
                 AsterixEventService.initialize(configuration, asterixDir, eventHome);
-                client = AsterixEventService.getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE
-                        .getCluster());
+                client = AsterixEventService
+                        .getAsterixEventServiceClient(AsterixClusterProperties.INSTANCE.getCluster());
 
                 lookupService = ServiceProvider.INSTANCE.getLookupService();
                 if (!lookupService.isRunning(configuration)) {
diff --git a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
index 3173525..faa0236 100644
--- a/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
+++ b/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
@@ -96,6 +96,7 @@
     private Set<String> failedNodes = new HashSet<>();
     private LinkedList<NodeFailbackPlan> pendingProcessingFailbackPlans;
     private Map<Long, NodeFailbackPlan> planId2FailbackPlanMap;
+    private HashMap<String, String> node2Ip = null;
 
     private AsterixClusterProperties() {
         InputStream is = this.getClass().getClassLoader().getResourceAsStream(CLUSTER_CONFIGURATION_FILE);
@@ -686,4 +687,20 @@
         }
         return stateDescription;
     }
-}
+
+    public String getNodeIpAddress(String nodeId) {
+        if (node2Ip == null) {
+            synchronized (this) {
+                if (node2Ip == null) {
+                    node2Ip = new HashMap<>();
+                    String instanceName = cluster.getInstanceName();
+                    List<Node> nodes = cluster.getNode();
+                    for (Node node : nodes) {
+                        node2Ip.put(instanceName + "_" + node.getId(), node.getClusterIp());
+                    }
+                }
+            }
+        }
+        return node2Ip.get(nodeId);
+    }
+}
\ No newline at end of file
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
index 5b29530..ab1ebe1 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
@@ -57,7 +57,7 @@
             //if no response available or it has an exception, request a new one
             if (reponse == null || reponse.getException() != null) {
                 ResourceIdRequestMessage msg = new ResourceIdRequestMessage();
-                ((INCMessageBroker) appCtx.getMessageBroker()).sendMessage(msg, this);
+                ((INCMessageBroker) appCtx.getMessageBroker()).sendMessageToCC(msg, this);
                 reponse = (ResourceIdRequestResponseMessage) resourceIdResponseQ.take();
                 if (reponse.getException() != null) {
                     throw new HyracksDataException(reponse.getException().getMessage());
diff --git a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
index f9d10af..c834128 100644
--- a/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
+++ b/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
@@ -1358,7 +1358,7 @@
             }
             //remove last comma
             nodeStores.deleteCharAt(nodeStores.length() - 1);
-            stores.add(new Store(node.getId(), nodeStores.toString()));
+            stores.add(new Store(node.getId(), "4501", nodeStores.toString()));
         }
         configuration.setStore(stores);
         List<Coredump> coredump = new ArrayList<Coredump>();
@@ -1369,10 +1369,13 @@
             coredumpDir = node.getLogDir() == null ? cluster.getLogDir() : node.getLogDir();
             coredump.add(new Coredump(node.getId(), coredumpDir + "coredump" + File.separator));
             txnLogDir = node.getTxnLogDir() == null ? cluster.getTxnLogDir() : node.getTxnLogDir(); //node or cluster-wide
-            txnLogDirs.add(new TransactionLogDir(node.getId(), txnLogDir
-                    + (txnLogDir.charAt(txnLogDir.length() - 1) == File.separatorChar ? File.separator : "")
-                    + "txnLogs" //if the string doesn't have a trailing / add one
-                    + File.separator));
+            txnLogDirs
+                    .add(new TransactionLogDir(node.getId(),
+                            txnLogDir
+                                    + (txnLogDir.charAt(txnLogDir.length() - 1) == File.separatorChar ? File.separator
+                                            : "")
+                                    + "txnLogs" //if the string doesn't have a trailing / add one
+                                    + File.separator));
         }
         configuration.setMetadataNode(metadataNodeId);
         configuration.setCoredump(coredump);

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/657

to look at the new patch set (#3).

Change subject: Introduce NC to NC Messaging
......................................................................

Introduce NC to NC Messaging

This change introduces NC to NC messaging. NC message
broker takes care of listening to NC message port and
sending messages to other NCs.

Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
---
A asterix-app/src/main/java/org/apache/asterix/messaging/MessageHandlerPool.java
A asterix-app/src/main/java/org/apache/asterix/messaging/MessageProcessor.java
M asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
M asterix-app/src/main/resources/asterix-build-configuration.xml
M asterix-app/src/main/resources/cluster.xml
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M asterix-common/src/main/resources/schema/asterix-conf.xsd
M asterix-common/src/main/resources/schema/cluster.xsd
M asterix-common/src/main/resources/schema/yarn_cluster.xsd
M asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
M asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
M asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/1node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/2node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/4node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/8node.xml
M asterix-external-data/pom.xml
M asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
M asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
M asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
M asterix-installer/src/main/resources/clusters/demo/demo.xml
M asterix-installer/src/main/resources/clusters/local/local.xml
M asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
M asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
M asterix-yarn/src/main/resources/configs/local.xml
M asterix-yarn/src/main/resources/configs/my_awesome_cluster_desc.xml
34 files changed, 718 insertions(+), 98 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/57/657/3
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Introduce NC to NC Messaging
......................................................................


Patch Set 4: Verified+1

Build Successful 

https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1071/ : SUCCESS

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Introduce NC to NC Messaging
......................................................................


Patch Set 2:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1069/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: WIP(nc2nc messaging)

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: WIP(nc2nc messaging)
......................................................................


Patch Set 5:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1515/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 5
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Ian Maxon <im...@apache.org>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "Ian Maxon (Code Review)" <do...@asterixdb.incubator.apache.org>.
Ian Maxon has posted comments on this change.

Change subject: Introduce NC to NC Messaging
......................................................................


Patch Set 4: Verified-1

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Ian Maxon <im...@apache.org>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Introduce NC to NC Messaging
......................................................................


Patch Set 3: Verified-1

Build Failed 

https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1070/ : ABORTED

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Introduce NC to NC Messaging
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/870/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Introduce NC to NC Messaging
......................................................................


Patch Set 4:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1071/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/657

to look at the new patch set (#4).

Change subject: Introduce NC to NC Messaging
......................................................................

Introduce NC to NC Messaging

This change introduces NC to NC messaging. NC message
broker takes care of listening to NC message port and
sending messages to other NCs.

Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
---
A asterix-app/src/main/java/org/apache/asterix/messaging/MessageHandlerPool.java
A asterix-app/src/main/java/org/apache/asterix/messaging/MessageProcessor.java
M asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
M asterix-app/src/main/resources/asterix-build-configuration.xml
M asterix-app/src/main/resources/cluster.xml
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M asterix-common/src/main/resources/schema/asterix-conf.xsd
M asterix-common/src/main/resources/schema/cluster.xsd
M asterix-common/src/main/resources/schema/yarn_cluster.xsd
M asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
M asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
M asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/1node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/2node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/4node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/8node.xml
M asterix-external-data/pom.xml
M asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
M asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
M asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
M asterix-installer/src/main/resources/clusters/demo/demo.xml
M asterix-installer/src/main/resources/clusters/local/local.xml
M asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
M asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
M asterix-yarn/src/main/resources/configs/local.xml
M asterix-yarn/src/main/resources/configs/my_awesome_cluster_desc.xml
34 files changed, 715 insertions(+), 98 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/57/657/4
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "Michael Blow (Code Review)" <do...@asterixdb.incubator.apache.org>.
Michael Blow has posted comments on this change.

Change subject: Introduce NC to NC Messaging
......................................................................


Patch Set 1:

(8 comments)

https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java:

Line 62:         cause.printStackTrace();
remove printStackTrace()


Line 80:                 future.cause().printStackTrace();
remove printStackTrace()


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java:

Line 41:         System.err.println("channelRead Called");
remove println


Line 47:             System.out.println(buf.readInt());
remove println


Line 54:         cause.printStackTrace();
remove printStackTrace


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java:

Line 69:                     th.printStackTrace();
remove printStackTrace()


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-common/src/main/resources/schema/asterix-conf.xsd
File asterix-common/src/main/resources/schema/asterix-conf.xsd:

Line 62:         
ws


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java:

Line 90:             e.printStackTrace();
remove printStackTrace()


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-HasComments: Yes

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "Murtadha Hubail (Code Review)" <do...@asterixdb.incubator.apache.org>.
Murtadha Hubail has posted comments on this change.

Change subject: Introduce NC to NC Messaging
......................................................................


Patch Set 1:

(24 comments)

Remove topic.

https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
File asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java:

Line 89:             }
revert this file


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
File asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java:

Line 117:             systemState = recoveryMgr.getSystemState();
revert this file


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java:

Line 31:         System.err.println("NCMessageDecoder: decode called");
Use logger.


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java:

Line 62:         cause.printStackTrace();
Use logger to print the exception message


Line 77:             if (future.isSuccess()) {
use try-finally and put future.channel().close() in the finally block.


Line 80:                 future.cause().printStackTrace();
Use logger to print the exception message


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java:

Line 54:         cause.printStackTrace();
Use logger to print the exception message


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java:

Line 32:     private int port;
final


Line 33:     private ChannelInboundHandlerAdapter handler = new NCMessagingHandler();
final


Line 34:     private ChannelFuture f;
rename to channelFuture


Line 64:                     // In this example, this does not happen, but you can do that to gracefully
Remove comment after first line


Line 68:                     // Do something
remove comment


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/resources/cluster.xml
File asterix-app/src/main/resources/cluster.xml:

Line 35:         <nc_messaging_port>4501</nc_messaging_port>
rename to messaging_port


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-common/src/main/resources/schema/asterix-conf.xsd
File asterix-common/src/main/resources/schema/asterix-conf.xsd:

Line 60:         name="nc_messaging_port"
rename to messagingPort (not that I like it, but just to be consistent with the other names)


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
File asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java:

Line 27: 
revert this file


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
File asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java:

Line 281:         String nodeFeedPort;
rename to nodeMessagingPort


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/pom.xml
File asterix-external-data/pom.xml:

Line 291:             <groupId>io.netty</groupId>
open a JIRA to add the library license if needed


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java:

Line 19: package org.apache.asterix.external.feed.management;
remove this file


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java:

Line 48:     private FeedChannelManager feedChannelManager;
remove variable and its usage.


Line 90:             e.printStackTrace();
why catch this exception? shouldn't it be illegalState to prevent the NC from completing its startup in an invalid state?


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java:

Line 20: 
this class is not needed anymore, right?


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java:

Line 29: 
this class is not needed anymore, right?


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
File asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java:

Line 213:             }
you need to add the check here for messaging port if it isn't defined on both the node and the cluster level.


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
File asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java:

Line 1361:             stores.add(new Store(node.getId(), "4501", nodeStores.toString()));
you need to update the yarn cluster file and pass the correct port.


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-HasComments: Yes

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Introduce NC to NC Messaging
......................................................................


Patch Set 1: Verified+1

Build Successful 

https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/870/ : SUCCESS

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-HasComments: No

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/657

to look at the new patch set (#2).

Change subject: Introduce NC to NC Messaging
......................................................................

Introduce NC to NC Messaging

This change introduces NC to NC messaging. NC message
broker takes care of listening to NC message port and
sending messages to other NCs.

Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
---
A asterix-app/src/main/java/org/apache/asterix/messaging/MessageHandlerPool.java
A asterix-app/src/main/java/org/apache/asterix/messaging/MessageProcessor.java
M asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClient.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
A asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
M asterix-app/src/main/resources/asterix-build-configuration.xml
M asterix-app/src/main/resources/cluster.xml
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
M asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M asterix-common/src/main/resources/schema/asterix-conf.xsd
M asterix-common/src/main/resources/schema/cluster.xsd
M asterix-common/src/main/resources/schema/yarn_cluster.xsd
M asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
M asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
M asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/1node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/2node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/4node.xml
M asterix-experiments/src/main/resources/ingestion-experiment-binary-and-configs/configs/8node.xml
M asterix-external-data/pom.xml
M asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
M asterix-external-data/src/main/java/org/apache/asterix/external/indexing/IndexingScheduler.java
M asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
M asterix-installer/src/main/resources/clusters/demo/demo.xml
M asterix-installer/src/main/resources/clusters/local/local.xml
M asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
M asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
M asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
M asterix-yarn/src/main/resources/configs/local.xml
M asterix-yarn/src/main/resources/configs/my_awesome_cluster_desc.xml
35 files changed, 751 insertions(+), 98 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/57/657/2
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>

Change in asterixdb[master]: WIP(nc2nc messaging)

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
abdullah alamoudi has abandoned this change.

Change subject: WIP(nc2nc messaging)
......................................................................


Abandoned

This change was moved to https://asterix-gerrit.ics.uci.edu/#/c/897/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: abandon
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 5
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Ian Maxon <im...@apache.org>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "Ian Maxon (Code Review)" <do...@asterixdb.incubator.apache.org>.
Ian Maxon has posted comments on this change.

Change subject: Introduce NC to NC Messaging
......................................................................


Patch Set 4: -Verified

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Ian Maxon <im...@apache.org>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Introduce NC to NC Messaging
......................................................................


Patch Set 3:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1070/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
abdullah alamoudi has posted comments on this change.

Change subject: Introduce NC to NC Messaging
......................................................................


Patch Set 1:

(32 comments)

https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
File asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java:

Line 89:             }
> revert this file
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
File asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java:

Line 117:             systemState = recoveryMgr.getSystemState();
> revert this file
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageDecoder.java:

Line 31:         System.err.println("NCMessageDecoder: decode called");
> Use logger.
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingClientHandler.java:

Line 62:         cause.printStackTrace();
> remove printStackTrace()
Done


Line 62:         cause.printStackTrace();
> Use logger to print the exception message
Done


Line 77:             if (future.isSuccess()) {
> use try-finally and put future.channel().close() in the finally block.
Done


Line 80:                 future.cause().printStackTrace();
> remove printStackTrace()
Done


Line 80:                 future.cause().printStackTrace();
> Use logger to print the exception message
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingHandler.java:

Line 41:         System.err.println("channelRead Called");
> remove println
Done


Line 47:             System.out.println(buf.readInt());
> remove println
Done


Line 54:         cause.printStackTrace();
> remove printStackTrace
Done


Line 54:         cause.printStackTrace();
> Use logger to print the exception message
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java
File asterix-app/src/main/java/org/apache/asterix/messaging/NCMessagingServer.java:

Line 32:     private int port;
> final
Done


Line 33:     private ChannelInboundHandlerAdapter handler = new NCMessagingHandler();
> final
Done


Line 34:     private ChannelFuture f;
> rename to channelFuture
Done


Line 64:                     // In this example, this does not happen, but you can do that to gracefully
> Remove comment after first line
Done


Line 68:                     // Do something
> remove comment
Done


Line 69:                     th.printStackTrace();
> remove printStackTrace()
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-app/src/main/resources/cluster.xml
File asterix-app/src/main/resources/cluster.xml:

Line 35:         <nc_messaging_port>4501</nc_messaging_port>
> rename to messaging_port
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-common/src/main/resources/schema/asterix-conf.xsd
File asterix-common/src/main/resources/schema/asterix-conf.xsd:

Line 60:         name="nc_messaging_port"
> rename to messagingPort (not that I like it, but just to be consistent with
Done


Line 62:         
> ws
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java
File asterix-events/src/main/java/org/apache/asterix/event/management/EventTask.java:

Line 27: 
> revert this file
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
File asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java:

Line 281:         String nodeFeedPort;
> rename to nodeMessagingPort
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/pom.xml
File asterix-external-data/pom.xml:

Line 291:             <groupId>io.netty</groupId>
> open a JIRA to add the library license if needed
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedChannelManager.java:

Line 19: package org.apache.asterix.external.feed.management;
> remove this file
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/management/FeedManager.java:

Line 48:     private FeedChannelManager feedChannelManager;
> remove variable and its usage.
Done


Line 90:             e.printStackTrace();
> why catch this exception? shouldn't it be illegalState to prevent the NC fr
Done


Line 90:             e.printStackTrace();
> remove printStackTrace()
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageHandler.java:

Line 20: 
> this class is not needed anymore, right?
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java
File asterix-external-data/src/main/java/org/apache/asterix/external/feed/message/FeedMessageServer.java:

Line 29: 
> this class is not needed anymore, right?
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
File asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java:

Line 213:             }
> you need to add the check here for messaging port if it isn't defined on bo
Done


https://asterix-gerrit.ics.uci.edu/#/c/657/1/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
File asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java:

Line 1361:             stores.add(new Store(node.getId(), "4501", nodeStores.toString()));
> you need to update the yarn cluster file and pass the correct port.
Done


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: Yes

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
abdullah alamoudi has posted comments on this change.

Change subject: Introduce NC to NC Messaging
......................................................................


Patch Set 4:

Guys,
Please, look at this change and let's get it in.

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Introduce NC to NC Messaging

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Introduce NC to NC Messaging
......................................................................


Patch Set 2: Verified-1

Build Failed 

https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/1069/ : ABORTED

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: WIP(nc2nc messaging)

Posted by "abdullah alamoudi (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/657

to look at the new patch set (#5).

Change subject: WIP(nc2nc messaging)
......................................................................

WIP(nc2nc messaging)

Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
---
M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplicationEntryPoint.java
M asterixdb/asterix-app/src/main/java/org/apache/asterix/messaging/NCMessageBroker.java
M asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
M asterixdb/asterix-app/src/main/resources/cluster.xml
M asterixdb/asterix-common/pom.xml
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixMetadataProperties.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixPropertiesAccessor.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/MessageHandlerPool.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/MessageProcessor.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/NCMessagingClient.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/NCMessagingClientHandler.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/NCMessagingHandler.java
A asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/NCMessagingServer.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/IApplicationMessage.java
M asterixdb/asterix-common/src/main/java/org/apache/asterix/common/messaging/api/INCMessageBroker.java
M asterixdb/asterix-common/src/main/resources/schema/asterix-conf.xsd
M asterixdb/asterix-common/src/main/resources/schema/cluster.xsd
M asterixdb/asterix-common/src/main/resources/schema/yarn_cluster.xsd
M asterixdb/asterix-events/src/main/java/org/apache/asterix/event/driver/EventDriver.java
M asterixdb/asterix-events/src/main/java/org/apache/asterix/event/management/EventUtil.java
M asterixdb/asterix-events/src/main/java/org/apache/asterix/event/service/AsterixEventServiceUtil.java
M asterixdb/asterix-installer/src/main/java/org/apache/asterix/installer/command/ValidateCommand.java
M asterixdb/asterix-om/src/main/java/org/apache/asterix/om/util/AsterixClusterProperties.java
M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/GlobalResourceIdFactory.java
M asterixdb/asterix-yarn/src/main/java/org/apache/asterix/aoya/AsterixYARNClient.java
M hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/messages/IMessageBroker.java
M hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/NetworkInputChannel.java
M hyracks-fullstack/hyracks/hyracks-comm/src/main/java/org/apache/hyracks/comm/channels/ReadBufferFactory.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/net/NetworkManager.java
M hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/PipelinedPartition.java
M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelControlBlock.java
M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/ChannelSet.java
M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/IChannelReadInterface.java
M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MultiplexedConnection.java
M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/muxdemux/MuxDemux.java
M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/ITCPConnectionEventListener.java
M hyracks-fullstack/hyracks/hyracks-net/src/main/java/org/apache/hyracks/net/protocols/tcp/TCPEndpoint.java
40 files changed, 1,021 insertions(+), 155 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/57/657/5
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/657
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I85bd5ea8ad5b50a6cfb25088d8853f26902799f9
Gerrit-PatchSet: 5
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <ba...@gmail.com>
Gerrit-Reviewer: Ian Maxon <im...@apache.org>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mi...@couchbase.com>
Gerrit-Reviewer: Murtadha Hubail <hu...@gmail.com>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Yingyi Bu <yi...@google.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>