You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/11/28 12:20:06 UTC
[1/2] apex-core git commit: APEXCORE-456 - Explicitly limit
Server.Subscriber to one way communication
Repository: apex-core
Updated Branches:
refs/heads/master 891ed3ae9 -> 4a1570df9
APEXCORE-456 - Explicitly limit Server.Subscriber to one way communication
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/308f1a7b
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/308f1a7b
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/308f1a7b
Branch: refs/heads/master
Commit: 308f1a7b0e91902ae5e6edbb614e7ea5ce975417
Parents: 5fb9d04
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Fri Nov 18 19:44:40 2016 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Sun Nov 20 17:42:13 2016 -0800
----------------------------------------------------------------------
.../bufferserver/internal/LogicalNode.java | 6 +-
.../bufferserver/internal/PhysicalNode.java | 25 ++--
.../datatorrent/bufferserver/server/Server.java | 117 +++++++------------
3 files changed, 54 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-core/blob/308f1a7b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
index c08cfb9..ceb9469 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java
@@ -32,8 +32,8 @@ import com.datatorrent.bufferserver.policy.Policy;
import com.datatorrent.bufferserver.util.BitVector;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.bufferserver.util.SerializedData;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
import com.datatorrent.netlet.EventLoop;
+import com.datatorrent.netlet.WriteOnlyClient;
/**
* LogicalNode represents a logical node in a DAG<p>
@@ -99,7 +99,7 @@ public class LogicalNode implements DataListener
*
* @param connection
*/
- public void addConnection(AbstractLengthPrependerClient connection)
+ public void addConnection(WriteOnlyClient connection)
{
PhysicalNode pn = new PhysicalNode(connection);
if (!physicalNodes.contains(pn)) {
@@ -111,7 +111,7 @@ public class LogicalNode implements DataListener
*
* @param client
*/
- public void removeChannel(AbstractLengthPrependerClient client)
+ public void removeChannel(WriteOnlyClient client)
{
for (PhysicalNode pn : physicalNodes) {
if (pn.getClient() == client) {
http://git-wip-us.apache.org/repos/asf/apex-core/blob/308f1a7b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
index 424a51a..9a3fe37 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java
@@ -23,7 +23,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.bufferserver.util.SerializedData;
-import com.datatorrent.netlet.AbstractLengthPrependerClient;
+import com.datatorrent.netlet.WriteOnlyClient;
/**
* PhysicalNode represents one physical subscriber.
@@ -32,16 +32,16 @@ import com.datatorrent.netlet.AbstractLengthPrependerClient;
*/
public class PhysicalNode
{
- public static final int BUFFER_SIZE = 8 * 1024;
private final long starttime;
- private final AbstractLengthPrependerClient client;
- private final long processedMessageCount;
+ private final WriteOnlyClient client;
+ private long processedMessageCount;
+ private SerializedData blocker;
/**
*
* @param client
*/
- public PhysicalNode(AbstractLengthPrependerClient client)
+ public PhysicalNode(WriteOnlyClient client)
{
this.client = client;
starttime = System.currentTimeMillis();
@@ -71,20 +71,11 @@ public class PhysicalNode
* @param d
* @throws InterruptedException
*/
- private SerializedData blocker;
-
public boolean send(SerializedData d)
{
- if (d.offset == d.dataOffset) {
- if (client.write(d.buffer, d.offset, d.length)) {
- return true;
- }
- } else {
- if (client.send(d.buffer, d.offset, d.length)) {
- return true;
- }
+ if (client.send(d.buffer, d.dataOffset, d.length - (d.dataOffset - d.offset))) {
+ return true;
}
-
blocker = d;
return false;
}
@@ -150,7 +141,7 @@ public class PhysicalNode
/**
* @return the channel
*/
- public AbstractLengthPrependerClient getClient()
+ public WriteOnlyClient getClient()
{
return client;
}
http://git-wip-us.apache.org/repos/asf/apex-core/blob/308f1a7b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
----------------------------------------------------------------------
diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
index 12eed5f..baa4e0b 100644
--- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
+++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java
@@ -54,6 +54,7 @@ import com.datatorrent.netlet.AbstractLengthPrependerClient;
import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.Listener.ServerListener;
+import com.datatorrent.netlet.WriteOnlyLengthPrependerClient;
import com.datatorrent.netlet.util.VarInt;
/**
@@ -171,7 +172,7 @@ public class Server implements ServerListener
private final ConcurrentHashMap<String, DataList> publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1);
private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<String, LogicalNode>();
private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<>();
- private final ConcurrentHashMap<String, AbstractLengthPrependerClient> subscriberChannels = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, ClientListener> subscriberChannels = new ConcurrentHashMap<>();
private final int blockSize;
private final int numberOfCacheBlocks;
@@ -235,15 +236,18 @@ public class Server implements ServerListener
/**
*
* @param request
- * @param connection
+ * @param key
* @return
*/
- public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request,
- final AbstractLengthPrependerClient connection)
+ public LogicalNode handleSubscriberRequest(SubscribeRequestTuple request, SelectionKey key)
{
String identifier = request.getIdentifier();
String type = request.getStreamType();
String upstream_identifier = request.getUpstreamIdentifier();
+ final Subscriber subscriber = new Subscriber(type, request.getMask(), request.getPartitions(), request.getBufferSize());
+ key.attach(subscriber);
+ subscriber.registered(key);
+ subscriber.connected();
// Check if there is a logical node of this type, if not create it.
final LogicalNode ln;
@@ -252,7 +256,7 @@ public class Server implements ServerListener
/*
* close previous connection with the same identifier which is guaranteed to be unique.
*/
- AbstractLengthPrependerClient previous = subscriberChannels.put(identifier, connection);
+ ClientListener previous = subscriberChannels.put(identifier, subscriber);
if (previous != null) {
eventloop.disconnect(previous);
}
@@ -264,7 +268,7 @@ public class Server implements ServerListener
public void run()
{
ln.boot(eventloop);
- ln.addConnection(connection);
+ ln.addConnection(subscriber);
ln.catchUp();
}
});
@@ -302,7 +306,7 @@ public class Server implements ServerListener
@Override
public void run()
{
- ln.addConnection(connection);
+ ln.addConnection(subscriber);
ln.catchUp();
dl.addDataListener(ln);
}
@@ -312,6 +316,32 @@ public class Server implements ServerListener
return ln;
}
+ private void teardownSubscriber(Subscriber subscriber)
+ {
+ LogicalNode ln = subscriberGroups.get(subscriber.type);
+ if (ln != null) {
+ if (subscriberChannels.containsValue(subscriber)) {
+ final Iterator<Entry<String, ClientListener>> i = subscriberChannels.entrySet().iterator();
+ while (i.hasNext()) {
+ if (i.next().getValue() == subscriber) {
+ i.remove();
+ break;
+ }
+ }
+ }
+
+ ln.removeChannel(subscriber);
+ if (ln.getPhysicalNodeCount() == 0) {
+ DataList dl = publisherBuffers.get(ln.getUpstream());
+ if (dl != null) {
+ dl.removeDataListener(ln);
+ }
+ subscriberGroups.remove(ln.getGroup());
+ }
+ ln.getIterator().close();
+ }
+ }
+
/**
*
* @param request
@@ -464,43 +494,11 @@ public class Server implements ServerListener
/*
* unregister the unidentified client since its job is done!
*/
- unregistered(key);
+ unregistered(key.interestOps(0));
ignore = true;
logger.info("Received subscriber request: {}", request);
- SubscribeRequestTuple subscriberRequest = (SubscribeRequestTuple)request;
- AbstractLengthPrependerClient subscriber;
-
-// /* for backward compatibility - set the buffer size to 16k - EXPERIMENTAL */
- int bufferSize = subscriberRequest.getBufferSize();
-// if (bufferSize == 0) {
-// bufferSize = 16 * 1024;
-// }
- if (subscriberRequest.getVersion().equals(Tuple.FAST_VERSION)) {
- subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(),
- subscriberRequest.getPartitions(), bufferSize);
- } else {
- subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(),
- subscriberRequest.getPartitions(), bufferSize)
- {
- @Override
- public int readSize()
- {
- if (writeOffset - readOffset < 2) {
- return -1;
- }
-
- short s = buffer[readOffset++];
- return s | (buffer[readOffset++] << 8);
- }
-
- };
- }
- key.attach(subscriber);
- key.interestOps(SelectionKey.OP_WRITE | SelectionKey.OP_READ);
- subscriber.registered(key);
-
- handleSubscriberRequest(subscriberRequest, subscriber);
+ handleSubscriberRequest((SubscribeRequestTuple)request, key);
break;
case PURGE_REQUEST:
@@ -528,7 +526,7 @@ public class Server implements ServerListener
}
- class Subscriber extends AbstractLengthPrependerClient
+ private class Subscriber extends WriteOnlyLengthPrependerClient
{
private final String type;
private final int mask;
@@ -536,18 +534,11 @@ public class Server implements ServerListener
Subscriber(String type, int mask, int[] partitions, int bufferSize)
{
- super(1024, bufferSize);
+ super(1024 * 1024, bufferSize == 0 ? 256 * 1024 : bufferSize);
this.type = type;
this.mask = mask;
this.partitions = partitions;
- super.write = false;
- }
-
- @Override
- public void onMessage(byte[] buffer, int offset, int size)
- {
- logger.warn("Received data when no data is expected: {}",
- Arrays.toString(Arrays.copyOfRange(buffer, offset, offset + size)));
+ super.isWriteEnabled = false;
}
@Override
@@ -580,29 +571,7 @@ public class Server implements ServerListener
return;
}
torndown = true;
-
- LogicalNode ln = subscriberGroups.get(type);
- if (ln != null) {
- if (subscriberChannels.containsValue(this)) {
- final Iterator<Entry<String, AbstractLengthPrependerClient>> i = subscriberChannels.entrySet().iterator();
- while (i.hasNext()) {
- if (i.next().getValue() == this) {
- i.remove();
- break;
- }
- }
- }
-
- ln.removeChannel(this);
- if (ln.getPhysicalNodeCount() == 0) {
- DataList dl = publisherBuffers.get(ln.getUpstream());
- if (dl != null) {
- dl.removeDataListener(ln);
- }
- subscriberGroups.remove(ln.getGroup());
- }
- ln.getIterator().close();
- }
+ teardownSubscriber(this);
}
}
[2/2] apex-core git commit: Merge branch 'APEXCORE-456' of
github.com:vrozov/apex-core
Posted by pr...@apache.org.
Merge branch 'APEXCORE-456' of github.com:vrozov/apex-core
Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/4a1570df
Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/4a1570df
Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/4a1570df
Branch: refs/heads/master
Commit: 4a1570df969957c8890c2984e9626816941148fd
Parents: 891ed3a 308f1a7
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Mon Nov 28 05:09:20 2016 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon Nov 28 05:09:20 2016 -0700
----------------------------------------------------------------------
.../bufferserver/internal/LogicalNode.java | 6 +-
.../bufferserver/internal/PhysicalNode.java | 25 ++--
.../datatorrent/bufferserver/server/Server.java | 117 +++++++------------
3 files changed, 54 insertions(+), 94 deletions(-)
----------------------------------------------------------------------