You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/11/28 12:20:06 UTC

[1/2] apex-core git commit: APEXCORE-456 - Explicitly limit Server.Subscriber to one way communication

Repository: apex-core
Updated Branches:
  refs/heads/master 891ed3ae9 -> 4a1570df9


APEXCORE-456 - Explicitly limit Server.Subscriber to one way communication


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/308f1a7b
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/308f1a7b
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/308f1a7b

Branch: refs/heads/master
Commit: 308f1a7b0e91902ae5e6edbb614e7ea5ce975417
Parents: 5fb9d04
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Fri Nov 18 19:44:40 2016 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Sun Nov 20 17:42:13 2016 -0800

----------------------------------------------------------------------
 .../bufferserver/internal/LogicalNode.java      |   6 +-
 .../bufferserver/internal/PhysicalNode.java     |  25 ++--
 .../datatorrent/bufferserver/server/Server.java | 117 +++++++------------
 3 files changed, 54 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-core/blob/308f1a7b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index c08cfb9..ceb9469 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -32,8 +32,8 @@ import com.datatorrent.bufferserver.policy.Policy;
 import com.datatorrent.bufferserver.util.BitVector;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.bufferserver.util.SerializedData;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
 import com.datatorrent.netlet.EventLoop;
+import com.datatorrent.netlet.WriteOnlyClient;
 
 /**
  * LogicalNode represents a logical node in a DAG<p>
@@ -99,7 +99,7 @@ public class LogicalNode implements DataListener
    *
    * @param connection
    */
-  public void addConnection(AbstractLengthPrependerClient connection)
+  public void addConnection(WriteOnlyClient connection)
   {
     PhysicalNode pn = new PhysicalNode(connection);
     if (!physicalNodes.contains(pn)) {
@@ -111,7 +111,7 @@ public class LogicalNode implements DataListener
    *
    * @param client
    */
-  public void removeChannel(AbstractLengthPrependerClient client)
+  public void removeChannel(WriteOnlyClient client)
   {
     for (PhysicalNode pn : physicalNodes) {
       if (pn.getClient() == client) {

http://git-wip-us.apache.org/repos/asf/apex-core/blob/308f1a7b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
index 424a51a..9a3fe37 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
@@ -23,7 +23,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.bufferserver.util.SerializedData;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
+import com.datatorrent.netlet.WriteOnlyClient;
 
 /**
  * PhysicalNode represents one physical subscriber.
@@ -32,16 +32,16 @@ import com.datatorrent.netlet.AbstractLengthPrependerClient;
  */
 public class PhysicalNode
 {
-  public static final int BUFFER_SIZE = 8 * 1024;
   private final long starttime;
-  private final AbstractLengthPrependerClient client;
-  private final long processedMessageCount;
+  private final WriteOnlyClient client;
+  private long processedMessageCount;
+  private SerializedData blocker;
 
   /**
    *
    * @param client
    */
-  public PhysicalNode(AbstractLengthPrependerClient client)
+  public PhysicalNode(WriteOnlyClient client)
   {
     this.client = client;
     starttime = System.currentTimeMillis();
@@ -71,20 +71,11 @@ public class PhysicalNode
    * @param d
    * @throws InterruptedException
    */
-  private SerializedData blocker;
-
   public boolean send(SerializedData d)
   {
-    if (d.offset == d.dataOffset) {
-      if (client.write(d.buffer, d.offset, d.length)) {
-        return true;
-      }
-    } else {
-      if (client.send(d.buffer, d.offset, d.length)) {
-        return true;
-      }
+    if (client.send(d.buffer, d.dataOffset, d.length - (d.dataOffset - d.offset))) {
+      return true;
     }
-
     blocker = d;
     return false;
   }
@@ -150,7 +141,7 @@ public class PhysicalNode
   /**
    * @return the channel
    */
-  public AbstractLengthPrependerClient getClient()
+  public WriteOnlyClient getClient()
   {
     return client;
   }

http://git-wip-us.apache.org/repos/asf/apex-core/blob/308f1a7b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 12eed5f..baa4e0b 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -54,6 +54,7 @@ import com.datatorrent.netlet.AbstractLengthPrependerClient;
 import com.datatorrent.netlet.DefaultEventLoop;
 import com.datatorrent.netlet.EventLoop;
 import com.datatorrent.netlet.Listener.ServerListener;
+import com.datatorrent.netlet.WriteOnlyLengthPrependerClient;
 import com.datatorrent.netlet.util.VarInt;
 
 /**
@@ -171,7 +172,7 @@ public class Server implements ServerListener
   private final ConcurrentHashMap<String, DataList> publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1);
   private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<String, LogicalNode>();
   private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<String, AbstractLengthPrependerClient> subscriberChannels = new ConcurrentHashMap<>();
+  private final ConcurrentHashMap<String, ClientListener> subscriberChannels = new ConcurrentHashMap<>();
   private final int blockSize;
   private final int numberOfCacheBlocks;
 
@@ -235,15 +236,18 @@ public class Server implements ServerListener
   /**
    *
    * @param request
-   * @param connection
+   * @param key
    * @return
    */
-  public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request,
-      final AbstractLengthPrependerClient connection)
+  public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, SelectionKey key)
   {
     String identifier = request.getIdentifier();
     String type = request.getStreamType();
     String upstream_identifier = request.getUpstreamIdentifier();
+    final Subscriber subscriber = new Subscriber(type, request.getMask(), request.getPartitions(), request.getBufferSize());
+    key.attach(subscriber);
+    subscriber.registered(key);
+    subscriber.connected();
 
     // Check if there is a logical node of this type, if not create it.
     final LogicalNode ln;
@@ -252,7 +256,7 @@ public class Server implements ServerListener
       /*
        * close previous connection with the same identifier which is guaranteed to be unique.
        */
-      AbstractLengthPrependerClient previous = subscriberChannels.put(identifier, connection);
+      ClientListener previous = subscriberChannels.put(identifier, subscriber);
       if (previous != null) {
         eventloop.disconnect(previous);
       }
@@ -264,7 +268,7 @@ public class Server implements ServerListener
         public void run()
         {
           ln.boot(eventloop);
-          ln.addConnection(connection);
+          ln.addConnection(subscriber);
           ln.catchUp();
         }
       });
@@ -302,7 +306,7 @@ public class Server implements ServerListener
         @Override
         public void run()
         {
-          ln.addConnection(connection);
+          ln.addConnection(subscriber);
           ln.catchUp();
           dl.addDataListener(ln);
         }
@@ -312,6 +316,32 @@ public class Server implements ServerListener
     return ln;
   }
 
+  private void teardownSubscriber(Subscriber subscriber)
+  {
+    LogicalNode ln = subscriberGroups.get(subscriber.type);
+    if (ln != null) {
+      if (subscriberChannels.containsValue(subscriber)) {
+        final Iterator<Entry<String, ClientListener>> i = subscriberChannels.entrySet().iterator();
+        while (i.hasNext()) {
+          if (i.next().getValue() == subscriber) {
+            i.remove();
+            break;
+          }
+        }
+      }
+
+      ln.removeChannel(subscriber);
+      if (ln.getPhysicalNodeCount() == 0) {
+        DataList dl = publisherBuffers.get(ln.getUpstream());
+        if (dl != null) {
+          dl.removeDataListener(ln);
+        }
+        subscriberGroups.remove(ln.getGroup());
+      }
+      ln.getIterator().close();
+    }
+  }
+
   /**
    *
    * @param request
@@ -464,43 +494,11 @@ public class Server implements ServerListener
           /*
            * unregister the unidentified client since its job is done!
            */
-          unregistered(key);
+          unregistered(key.interestOps(0));
           ignore = true;
           logger.info("Received subscriber request: {}", request);
 
-          SubscribeRequestTuple subscriberRequest = (SubscribeRequestTuple)request;
-          AbstractLengthPrependerClient subscriber;
-
-//          /* for backward compatibility - set the buffer size to 16k - EXPERIMENTAL */
-          int bufferSize = subscriberRequest.getBufferSize();
-//          if (bufferSize == 0) {
-//            bufferSize = 16 * 1024;
-//          }
-          if (subscriberRequest.getVersion().equals(Tuple.FAST_VERSION)) {
-            subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(),
-                subscriberRequest.getPartitions(), bufferSize);
-          } else {
-            subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(),
-                subscriberRequest.getPartitions(), bufferSize)
-            {
-              @Override
-              public int readSize()
-              {
-                if (writeOffset - readOffset < 2) {
-                  return -1;
-                }
-
-                short s = buffer[readOffset++];
-                return s | (buffer[readOffset++] << 8);
-              }
-
-            };
-          }
-          key.attach(subscriber);
-          key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
-          subscriber.registered(key);
-
-          handleSubscriberRequest(subscriberRequest, subscriber);
+          handleSubscriberRequest((SubscribeRequestTuple)request, key);
           break;
 
         case PURGE_REQUEST:
@@ -528,7 +526,7 @@ public class Server implements ServerListener
 
   }
 
-  class Subscriber extends AbstractLengthPrependerClient
+  private class Subscriber extends WriteOnlyLengthPrependerClient
   {
     private final String type;
     private final int mask;
@@ -536,18 +534,11 @@ public class Server implements ServerListener
 
     Subscriber(String type, int mask, int[] partitions, int bufferSize)
     {
-      super(1024, bufferSize);
+      super(1024 * 1024, bufferSize == 0 ? 256 * 1024 : bufferSize);
       this.type = type;
       this.mask = mask;
       this.partitions = partitions;
-      super.write = false;
-    }
-
-    @Override
-    public void onMessage(byte[] buffer, int offset, int size)
-    {
-      logger.warn("Received data when no data is expected: {}",
-          Arrays.toString(Arrays.copyOfRange(buffer, offset, offset + size)));
+      super.isWriteEnabled = false;
     }
 
     @Override
@@ -580,29 +571,7 @@ public class Server implements ServerListener
         return;
       }
       torndown = true;
-
-      LogicalNode ln = subscriberGroups.get(type);
-      if (ln != null) {
-        if (subscriberChannels.containsValue(this)) {
-          final Iterator<Entry<String, AbstractLengthPrependerClient>> i = subscriberChannels.entrySet().iterator();
-          while (i.hasNext()) {
-            if (i.next().getValue() == this) {
-              i.remove();
-              break;
-            }
-          }
-        }
-
-        ln.removeChannel(this);
-        if (ln.getPhysicalNodeCount() == 0) {
-          DataList dl = publisherBuffers.get(ln.getUpstream());
-          if (dl != null) {
-            dl.removeDataListener(ln);
-          }
-          subscriberGroups.remove(ln.getGroup());
-        }
-        ln.getIterator().close();
-      }
+      teardownSubscriber(this);
     }
 
   }


[2/2] apex-core git commit: Merge branch 'APEXCORE-456' of github.com:vrozov/apex-core

Posted by pr...@apache.org.
Merge branch 'APEXCORE-456' of github.com:vrozov/apex-core


Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/4a1570df
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/4a1570df
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/4a1570df

Branch: refs/heads/master
Commit: 4a1570df969957c8890c2984e9626816941148fd
Parents: 891ed3a 308f1a7
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Mon Nov 28 05:09:20 2016 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon Nov 28 05:09:20 2016 -0700

----------------------------------------------------------------------
 .../bufferserver/internal/LogicalNode.java      |   6 +-
 .../bufferserver/internal/PhysicalNode.java     |  25 ++--
 .../datatorrent/bufferserver/server/Server.java | 117 +++++++------------
 3 files changed, 54 insertions(+), 94 deletions(-)
----------------------------------------------------------------------