You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by df...@apache.org on 2013/03/07 10:14:21 UTC

[1/2] git commit: S4-103 and S4-76 Fix stream ids and remove app ids

S4-103 and S4-76 Fix stream ids and remove app ids


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/cd8f28a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/cd8f28a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/cd8f28a0

Branch: refs/heads/dev
Commit: cd8f28a08b519b0b9c8ecf6aa194535ca519b726
Parents: 755ed6b
Author: Matthieu Morel <mm...@apache.org>
Authored: Fri Mar 1 13:38:17 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Fri Mar 1 13:38:17 2013 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/base/Event.java    |   33 ++-------------
 .../org/apache/s4/comm/topology/RemoteStreams.java |   18 ++++++--
 .../apache/s4/comm/topology/StreamConsumer.java    |   11 +----
 .../apache/s4/comm/topology/ZkRemoteStreams.java   |   23 ++++------
 .../src/main/java/org/apache/s4/core/App.java      |   18 +--------
 .../org/apache/s4/core/DefaultRemoteSenders.java   |    3 +-
 .../main/java/org/apache/s4/core/ReceiverImpl.java |   27 +++---------
 .../main/java/org/apache/s4/core/RemoteStream.java |    3 +-
 .../src/main/java/org/apache/s4/core/Stream.java   |    1 -
 9 files changed, 38 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
index f4186a8..aaf8649 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
@@ -38,8 +38,7 @@ public class Event {
     private static final Logger logger = LoggerFactory.getLogger(Event.class);
 
     final private long time;
-    private String streamName;
-    private int appId;
+    private String streamId;
     private Map<String, Data<?>> map;
 
     /** Default constructor sets time using system time. */
@@ -67,8 +66,8 @@ public class Event {
      * 
      * @return the target stream id
      */
-    public String getStreamName() {
-        return streamName;
+    public String getStreamId() {
+        return streamId;
     }
 
     /**
@@ -78,31 +77,7 @@ public class Event {
      *            mode.
      */
     public void setStreamId(String streamName) {
-        this.streamName = streamName;
-    }
-
-    /**
-     * All events must be assigned the unique App ID of the App that owns the stream to which this event is injected.
-     * The assignment must be done automatically by the stream that receives the event. Each application has a unique
-     * ID. We use the app ID in combination with the stream ID to identify stream instances in a cluster.
-     * 
-     * 
-     * @return the unique application ID.
-     */
-    public int getAppId() {
-        return appId;
-    }
-
-    /**
-     * All events must be assigned the unique App ID of the App that owns the stream to which this event is injected.
-     * The assignment must be done automatically by the stream that receives the event. Each application has a unique
-     * ID. We use the app ID in combination with the stream ID to identify stream instances in a cluster.
-     * 
-     * @param appId
-     *            a unique application identifier, typically assigned by the deployment system.
-     */
-    public void setAppId(int appId) {
-        this.appId = appId;
+        this.streamId = streamName;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index 9b9be90..2f15804 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -17,17 +17,27 @@ import java.util.Set;
 
 public interface RemoteStreams {
 
+    /**
+     * Lists consumers of a given stream
+     */
     public abstract Set<StreamConsumer> getConsumers(String streamName);
 
-    public abstract void addOutputStream(String appId, String clusterName, String streamName);
+    /**
+     * Publishes availability of an output stream
+     * 
+     * @param clusterName
+     *            originating cluster
+     * @param streamName
+     *            name of stream
+     */
+    public abstract void addOutputStream(String clusterName, String streamName);
 
     /**
-     * Publishes interest in a stream from an application.
+     * Publishes interest in a stream, by a given cluster
      * 
-     * @param appId
      * @param clusterName
      * @param streamName
      */
-    public abstract void addInputStream(int appId, String clusterName, String streamName);
+    public abstract void addInputStream(String clusterName, String streamName);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
index 52fa2a8..d969870 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
@@ -25,19 +25,13 @@ package org.apache.s4.comm.topology;
  */
 public class StreamConsumer {
 
-    int appId;
     String clusterName;
 
-    public StreamConsumer(int appId, String clusterName) {
+    public StreamConsumer(String clusterName) {
         super();
-        this.appId = appId;
         this.clusterName = clusterName;
     }
 
-    public int getAppId() {
-        return appId;
-    }
-
     public String getClusterName() {
         return clusterName;
     }
@@ -46,7 +40,6 @@ public class StreamConsumer {
     public int hashCode() {
         final int prime = 31;
         int result = 1;
-        result = prime * result + appId;
         result = prime * result + ((clusterName == null) ? 0 : clusterName.hashCode());
         return result;
     }
@@ -60,8 +53,6 @@ public class StreamConsumer {
         if (getClass() != obj.getClass())
             return false;
         StreamConsumer other = (StreamConsumer) obj;
-        if (appId != other.appId)
-            return false;
         if (clusterName == null) {
             if (other.clusterName != null)
                 return false;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
index 0855fc0..559bd68 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
@@ -164,8 +164,7 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
         for (String element : elements) {
             ZNRecord producerData = zkClient.readData(type.getPath(streamName) + "/" + element, true);
             if (producerData != null) {
-                StreamConsumer consumer = new StreamConsumer(Integer.valueOf(producerData.getSimpleField("appId")),
-                        producerData.getSimpleField("clusterName"));
+                StreamConsumer consumer = new StreamConsumer(producerData.getSimpleField("clusterName"));
                 consumers.add(consumer);
             }
         }
@@ -179,20 +178,18 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
      * java.lang.String)
      */
     @Override
-    public void addOutputStream(String appId, String clusterName, String streamName) {
+    public void addOutputStream(String clusterName, String streamName) {
         lock.lock();
         try {
-            logger.debug("Adding output stream [{}] for app [{}] in cluster [{}]", new String[] { streamName, appId,
-                    clusterName });
+            logger.debug("Adding output stream [{}] in cluster [{}]", new String[] { streamName, clusterName });
             createStreamPaths(streamName);
-            ZNRecord producer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
-            producer.putSimpleField("appId", appId);
+            ZNRecord producer = new ZNRecord(streamName + "/" + clusterName);
             producer.putSimpleField("clusterName", clusterName);
             try {
                 zkClient.createEphemeralSequential(StreamType.PRODUCER.getPath(streamName) + "/producer-", producer);
             } catch (Throwable e) {
                 logger.error("Exception trying to create producer stream [{}] for app [{}] and cluster [{}] : [{}] :",
-                        new String[] { streamName, appId, clusterName, e.getMessage() });
+                        new String[] { streamName, clusterName, e.getMessage() });
             }
             refreshStreams();
         } finally {
@@ -214,21 +211,19 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
      * @see org.apache.s4.comm.topology.RemoteStreams#addInputStream(int, java.lang.String, java.lang.String)
      */
     @Override
-    public void addInputStream(int appId, String clusterName, String streamName) {
+    public void addInputStream(String clusterName, String streamName) {
         lock.lock();
         try {
-            logger.debug("Adding input stream [{}] for app [{}] in cluster [{}]",
-                    new String[] { streamName, String.valueOf(appId), clusterName });
+            logger.debug("Adding input stream [{}] in cluster [{}]", new String[] { streamName, clusterName });
             createStreamPaths(streamName);
-            ZNRecord consumer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
-            consumer.putSimpleField("appId", String.valueOf(appId));
+            ZNRecord consumer = new ZNRecord(streamName + "/" + clusterName);
             consumer.putSimpleField("clusterName", clusterName);
             try {
                 // NOTE: We create 1 sequential znode per consumer node instance
                 zkClient.createEphemeralSequential(StreamType.CONSUMER.getPath(streamName) + "/consumer-", consumer);
             } catch (Throwable e) {
                 logger.error("Exception trying to create consumer stream [{}] for app [{}] and cluster [{}] : [{}] :",
-                        new String[] { streamName, String.valueOf(appId), clusterName, e.getMessage() });
+                        new String[] { streamName, clusterName, e.getMessage() });
             }
             refreshStreams();
         } finally {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 6b8c7ef..8a5a276 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -63,7 +63,6 @@ public abstract class App {
     Map<String, ProcessingElement> peByName = Maps.newHashMap();
 
     private ClockType clockType = ClockType.WALL_CLOCK;
-    private int id = -1;
 
     @Inject
     private Sender sender;
@@ -112,21 +111,6 @@ public abstract class App {
         WALL_CLOCK, EVENT_CLOCK
     };
 
-    /**
-     * @return the unique app id
-     */
-    public int getId() {
-        return id;
-    }
-
-    /**
-     * @param id
-     *            the unique id for this app
-     */
-    public void setId(int id) {
-        this.id = id;
-    }
-
     /* Should only be used within the core package. */
     void addPEPrototype(ProcessingElement pePrototype) {
         pePrototypes.add(pePrototype);
@@ -379,7 +363,7 @@ public abstract class App {
      */
     protected <T extends Event> Stream<T> createInputStream(String streamName, KeyFinder<T> finder,
             ProcessingElement... processingElements) {
-        remoteStreams.addInputStream(getId(), clusterName, streamName);
+        remoteStreams.addInputStream(clusterName, streamName);
         return createStream(streamName, finder, processingElements);
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
index 6aaa8f1..0f7db54 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
@@ -76,8 +76,7 @@ public class DefaultRemoteSenders implements RemoteSenders {
     @Override
     public void send(String hashKey, Event event) {
 
-        Set<StreamConsumer> consumers = remoteStreams.getConsumers(event.getStreamName());
-        event.setAppId(-1);
+        Set<StreamConsumer> consumers = remoteStreams.getConsumers(event.getStreamId());
         for (StreamConsumer consumer : consumers) {
             // NOTE: even though there might be several ephemeral znodes for the same app and topology, they are
             // represented by a single stream consumer

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
index 6b4a1cf..fff4bfd 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
@@ -54,7 +54,7 @@ public class ReceiverImpl implements Receiver {
 
     final private Listener listener;
     final private SerializerDeserializer serDeser;
-    private final Map<Integer, Map<String, Stream<? extends Event>>> streams;
+    private final Map<String, Stream<? extends Event>> streams;
 
     @Inject
     S4Metrics metrics;
@@ -74,24 +74,12 @@ public class ReceiverImpl implements Receiver {
 
     /** Save stream keyed by app id and stream id. */
     void addStream(Stream<? extends Event> stream) {
-        int appId = stream.getApp().getId();
-        Map<String, Stream<? extends Event>> appMap = streams.get(appId);
-        if (appMap == null) {
-            appMap = new MapMaker().makeMap();
-            streams.put(appId, appMap);
-        }
-        appMap.put(stream.getName(), stream);
+        streams.put(stream.getName(), stream);
     }
 
     /** Remove stream when it is no longer needed. */
     void removeStream(Stream<? extends Event> stream) {
-        int appId = stream.getApp().getId();
-        Map<String, Stream<? extends Event>> appMap = streams.get(appId);
-        if (appMap == null) {
-            logger.error("Tried to remove a stream that is not registered in the receiver.");
-            return;
-        }
-        appMap.remove(stream.getName());
+        streams.remove(stream.getName());
     }
 
     @Override
@@ -99,15 +87,14 @@ public class ReceiverImpl implements Receiver {
         metrics.receivedEventFromCommLayer(message.array().length);
         Event event = (Event) serDeser.deserialize(message);
 
-        String streamId = event.getStreamName();
+        String streamId = event.getStreamId();
 
         /*
-         * Match appId and streamId in event to the target stream and pass the event to the target stream. TODO: make
-         * this more efficient for the case in which we send the same event to multiple PEs.
+         * Match streamId in event to the target stream and pass the event to the target stream. TODO: make this more
+         * efficient for the case in which we send the same event to multiple PEs.
          */
         try {
-            Map<String, Stream<? extends Event>> map = streams.get(-1);
-            map.get(streamId).receiveEvent(event);
+            streams.get(streamId).receiveEvent(event);
         } catch (NullPointerException e) {
             logger.error("Could not find target stream for event with streamId={}", streamId);
         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
index a097c1d..c748aa2 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
@@ -55,14 +55,13 @@ public class RemoteStream implements Streamable<Event> {
         } else {
             this.key = new Key<Event>(finder, DEFAULT_SEPARATOR);
         }
-        remoteStreams.addOutputStream(String.valueOf(app.getId()), clusterName, name);
+        remoteStreams.addOutputStream(clusterName, name);
 
     }
 
     @Override
     public void put(Event event) {
         event.setStreamId(getName());
-        event.setAppId(app.getId());
 
         if (key != null) {
             remoteSenders.send(key.get(event), event);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 14baa2b..d078628 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -163,7 +163,6 @@ public class Stream<T extends Event> implements Streamable {
     @SuppressWarnings("unchecked")
     public void put(Event event) {
         event.setStreamId(getName());
-        event.setAppId(app.getId());
 
         /*
          * Events may be sent to local or remote partitions or both. The following code implements the logic.