You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by df...@apache.org on 2013/03/07 10:14:21 UTC
[1/2] git commit: S4-103 and S4-76 Fix stream ids and remove app ids
S4-103 and S4-76 Fix stream ids and remove app ids
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/cd8f28a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/cd8f28a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/cd8f28a0
Branch: refs/heads/dev
Commit: cd8f28a08b519b0b9c8ecf6aa194535ca519b726
Parents: 755ed6b
Author: Matthieu Morel <mm...@apache.org>
Authored: Fri Mar 1 13:38:17 2013 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Fri Mar 1 13:38:17 2013 +0100
----------------------------------------------------------------------
.../src/main/java/org/apache/s4/base/Event.java | 33 ++-------------
.../org/apache/s4/comm/topology/RemoteStreams.java | 18 ++++++--
.../apache/s4/comm/topology/StreamConsumer.java | 11 +----
.../apache/s4/comm/topology/ZkRemoteStreams.java | 23 ++++------
.../src/main/java/org/apache/s4/core/App.java | 18 +--------
.../org/apache/s4/core/DefaultRemoteSenders.java | 3 +-
.../main/java/org/apache/s4/core/ReceiverImpl.java | 27 +++---------
.../main/java/org/apache/s4/core/RemoteStream.java | 3 +-
.../src/main/java/org/apache/s4/core/Stream.java | 1 -
9 files changed, 38 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
index f4186a8..aaf8649 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
@@ -38,8 +38,7 @@ public class Event {
private static final Logger logger = LoggerFactory.getLogger(Event.class);
final private long time;
- private String streamName;
- private int appId;
+ private String streamId;
private Map<String, Data<?>> map;
/** Default constructor sets time using system time. */
@@ -67,8 +66,8 @@ public class Event {
*
* @return the target stream id
*/
- public String getStreamName() {
- return streamName;
+ public String getStreamId() {
+ return streamId;
}
/**
@@ -78,31 +77,7 @@ public class Event {
* mode.
*/
public void setStreamId(String streamName) {
- this.streamName = streamName;
- }
-
- /**
- * All events must be assigned the unique App ID of the App that owns the stream to which this event is injected.
- * The assignment must be done automatically by the stream that receives the event. Each application has a unique
- * ID. We use the app ID in combination with the stream ID to identify stream instances in a cluster.
- *
- *
- * @return the unique application ID.
- */
- public int getAppId() {
- return appId;
- }
-
- /**
- * All events must be assigned the unique App ID of the App that owns the stream to which this event is injected.
- * The assignment must be done automatically by the stream that receives the event. Each application has a unique
- * ID. We use the app ID in combination with the stream ID to identify stream instances in a cluster.
- *
- * @param appId
- * a unique application identifier, typically assigned by the deployment system.
- */
- public void setAppId(int appId) {
- this.appId = appId;
+ this.streamId = streamName;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index 9b9be90..2f15804 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -17,17 +17,27 @@ import java.util.Set;
public interface RemoteStreams {
+ /**
+ * Lists consumers of a given stream
+ */
public abstract Set<StreamConsumer> getConsumers(String streamName);
- public abstract void addOutputStream(String appId, String clusterName, String streamName);
+ /**
+ * Publishes availability of an output stream
+ *
+ * @param clusterName
+ * originating cluster
+ * @param streamName
+ * name of stream
+ */
+ public abstract void addOutputStream(String clusterName, String streamName);
/**
- * Publishes interest in a stream from an application.
+ * Publishes interest in a stream, by a given cluster
*
- * @param appId
* @param clusterName
* @param streamName
*/
- public abstract void addInputStream(int appId, String clusterName, String streamName);
+ public abstract void addInputStream(String clusterName, String streamName);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
index 52fa2a8..d969870 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/StreamConsumer.java
@@ -25,19 +25,13 @@ package org.apache.s4.comm.topology;
*/
public class StreamConsumer {
- int appId;
String clusterName;
- public StreamConsumer(int appId, String clusterName) {
+ public StreamConsumer(String clusterName) {
super();
- this.appId = appId;
this.clusterName = clusterName;
}
- public int getAppId() {
- return appId;
- }
-
public String getClusterName() {
return clusterName;
}
@@ -46,7 +40,6 @@ public class StreamConsumer {
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + appId;
result = prime * result + ((clusterName == null) ? 0 : clusterName.hashCode());
return result;
}
@@ -60,8 +53,6 @@ public class StreamConsumer {
if (getClass() != obj.getClass())
return false;
StreamConsumer other = (StreamConsumer) obj;
- if (appId != other.appId)
- return false;
if (clusterName == null) {
if (other.clusterName != null)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
index 0855fc0..559bd68 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
@@ -164,8 +164,7 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
for (String element : elements) {
ZNRecord producerData = zkClient.readData(type.getPath(streamName) + "/" + element, true);
if (producerData != null) {
- StreamConsumer consumer = new StreamConsumer(Integer.valueOf(producerData.getSimpleField("appId")),
- producerData.getSimpleField("clusterName"));
+ StreamConsumer consumer = new StreamConsumer(producerData.getSimpleField("clusterName"));
consumers.add(consumer);
}
}
@@ -179,20 +178,18 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
* java.lang.String)
*/
@Override
- public void addOutputStream(String appId, String clusterName, String streamName) {
+ public void addOutputStream(String clusterName, String streamName) {
lock.lock();
try {
- logger.debug("Adding output stream [{}] for app [{}] in cluster [{}]", new String[] { streamName, appId,
- clusterName });
+ logger.debug("Adding output stream [{}] in cluster [{}]", new String[] { streamName, clusterName });
createStreamPaths(streamName);
- ZNRecord producer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
- producer.putSimpleField("appId", appId);
+ ZNRecord producer = new ZNRecord(streamName + "/" + clusterName);
producer.putSimpleField("clusterName", clusterName);
try {
zkClient.createEphemeralSequential(StreamType.PRODUCER.getPath(streamName) + "/producer-", producer);
} catch (Throwable e) {
logger.error("Exception trying to create producer stream [{}] for app [{}] and cluster [{}] : [{}] :",
- new String[] { streamName, appId, clusterName, e.getMessage() });
+ new String[] { streamName, clusterName, e.getMessage() });
}
refreshStreams();
} finally {
@@ -214,21 +211,19 @@ public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, Remo
* @see org.apache.s4.comm.topology.RemoteStreams#addInputStream(int, java.lang.String, java.lang.String)
*/
@Override
- public void addInputStream(int appId, String clusterName, String streamName) {
+ public void addInputStream(String clusterName, String streamName) {
lock.lock();
try {
- logger.debug("Adding input stream [{}] for app [{}] in cluster [{}]",
- new String[] { streamName, String.valueOf(appId), clusterName });
+ logger.debug("Adding input stream [{}] in cluster [{}]", new String[] { streamName, clusterName });
createStreamPaths(streamName);
- ZNRecord consumer = new ZNRecord(streamName + "/" + clusterName + "/" + appId);
- consumer.putSimpleField("appId", String.valueOf(appId));
+ ZNRecord consumer = new ZNRecord(streamName + "/" + clusterName);
consumer.putSimpleField("clusterName", clusterName);
try {
// NOTE: We create 1 sequential znode per consumer node instance
zkClient.createEphemeralSequential(StreamType.CONSUMER.getPath(streamName) + "/consumer-", consumer);
} catch (Throwable e) {
logger.error("Exception trying to create consumer stream [{}] for app [{}] and cluster [{}] : [{}] :",
- new String[] { streamName, String.valueOf(appId), clusterName, e.getMessage() });
+ new String[] { streamName, clusterName, e.getMessage() });
}
refreshStreams();
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 6b8c7ef..8a5a276 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -63,7 +63,6 @@ public abstract class App {
Map<String, ProcessingElement> peByName = Maps.newHashMap();
private ClockType clockType = ClockType.WALL_CLOCK;
- private int id = -1;
@Inject
private Sender sender;
@@ -112,21 +111,6 @@ public abstract class App {
WALL_CLOCK, EVENT_CLOCK
};
- /**
- * @return the unique app id
- */
- public int getId() {
- return id;
- }
-
- /**
- * @param id
- * the unique id for this app
- */
- public void setId(int id) {
- this.id = id;
- }
-
/* Should only be used within the core package. */
void addPEPrototype(ProcessingElement pePrototype) {
pePrototypes.add(pePrototype);
@@ -379,7 +363,7 @@ public abstract class App {
*/
protected <T extends Event> Stream<T> createInputStream(String streamName, KeyFinder<T> finder,
ProcessingElement... processingElements) {
- remoteStreams.addInputStream(getId(), clusterName, streamName);
+ remoteStreams.addInputStream(clusterName, streamName);
return createStream(streamName, finder, processingElements);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
index 6aaa8f1..0f7db54 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
@@ -76,8 +76,7 @@ public class DefaultRemoteSenders implements RemoteSenders {
@Override
public void send(String hashKey, Event event) {
- Set<StreamConsumer> consumers = remoteStreams.getConsumers(event.getStreamName());
- event.setAppId(-1);
+ Set<StreamConsumer> consumers = remoteStreams.getConsumers(event.getStreamId());
for (StreamConsumer consumer : consumers) {
// NOTE: even though there might be several ephemeral znodes for the same app and topology, they are
// represented by a single stream consumer
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
index 6b4a1cf..fff4bfd 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java
@@ -54,7 +54,7 @@ public class ReceiverImpl implements Receiver {
final private Listener listener;
final private SerializerDeserializer serDeser;
- private final Map<Integer, Map<String, Stream<? extends Event>>> streams;
+ private final Map<String, Stream<? extends Event>> streams;
@Inject
S4Metrics metrics;
@@ -74,24 +74,12 @@ public class ReceiverImpl implements Receiver {
/** Save stream keyed by app id and stream id. */
void addStream(Stream<? extends Event> stream) {
- int appId = stream.getApp().getId();
- Map<String, Stream<? extends Event>> appMap = streams.get(appId);
- if (appMap == null) {
- appMap = new MapMaker().makeMap();
- streams.put(appId, appMap);
- }
- appMap.put(stream.getName(), stream);
+ streams.put(stream.getName(), stream);
}
/** Remove stream when it is no longer needed. */
void removeStream(Stream<? extends Event> stream) {
- int appId = stream.getApp().getId();
- Map<String, Stream<? extends Event>> appMap = streams.get(appId);
- if (appMap == null) {
- logger.error("Tried to remove a stream that is not registered in the receiver.");
- return;
- }
- appMap.remove(stream.getName());
+ streams.remove(stream.getName());
}
@Override
@@ -99,15 +87,14 @@ public class ReceiverImpl implements Receiver {
metrics.receivedEventFromCommLayer(message.array().length);
Event event = (Event) serDeser.deserialize(message);
- String streamId = event.getStreamName();
+ String streamId = event.getStreamId();
/*
- * Match appId and streamId in event to the target stream and pass the event to the target stream. TODO: make
- * this more efficient for the case in which we send the same event to multiple PEs.
+ * Match streamId in event to the target stream and pass the event to the target stream. TODO: make this more
+ * efficient for the case in which we send the same event to multiple PEs.
*/
try {
- Map<String, Stream<? extends Event>> map = streams.get(-1);
- map.get(streamId).receiveEvent(event);
+ streams.get(streamId).receiveEvent(event);
} catch (NullPointerException e) {
logger.error("Could not find target stream for event with streamId={}", streamId);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
index a097c1d..c748aa2 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java
@@ -55,14 +55,13 @@ public class RemoteStream implements Streamable<Event> {
} else {
this.key = new Key<Event>(finder, DEFAULT_SEPARATOR);
}
- remoteStreams.addOutputStream(String.valueOf(app.getId()), clusterName, name);
+ remoteStreams.addOutputStream(clusterName, name);
}
@Override
public void put(Event event) {
event.setStreamId(getName());
- event.setAppId(app.getId());
if (key != null) {
remoteSenders.send(key.get(event), event);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/cd8f28a0/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 14baa2b..d078628 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -163,7 +163,6 @@ public class Stream<T extends Event> implements Streamable {
@SuppressWarnings("unchecked")
public void put(Event event) {
event.setStreamId(getName());
- event.setAppId(app.getId());
/*
* Events may be sent to local or remote partitions or both. The following code implements the logic.