You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/09/12 16:36:37 UTC
[2/7] git commit: S4-44 cleaned up some leftovers
S4-44 cleaned up some leftovers
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/a0574b51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/a0574b51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/a0574b51
Branch: refs/heads/dev-old
Commit: a0574b516d3936eb3c45284b91e04a121bb2700d
Parents: b6a90d0
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu Mar 1 17:26:27 2012 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Mar 1 17:26:27 2012 +0100
----------------------------------------------------------------------
.../java/org/apache/s4/processor/PEContainer.java | 160 ++++++---------
1 files changed, 59 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a0574b51/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java b/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
index e50b24b..9b12ed7 100644
--- a/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
+++ b/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
@@ -55,7 +55,6 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
private int maxQueueSize = 1000;
private boolean trackByKey;
private Map<String, Integer> countByEventType = Collections.synchronizedMap(new HashMap<String, Integer>());
- private SafeKeeper safeKeeper;
private ControlEventProcessor controlEventProcessor = null;
@@ -80,7 +79,7 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
}
public void setSafeKeeper(SafeKeeper sk) {
- this.safeKeeper = sk;
+ // kept for backward compatibility with existing configuration files
}
public void addProcessor(AbstractPE processor) {
@@ -119,7 +118,7 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
/*
* (non-Javadoc)
- *
+ *
* @see
* org.apache.s4.processor.AsynchronousEventProcessor#queueWork(org.apache.s4.collector.
* EventWrapper)
@@ -153,7 +152,7 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
// run()
/*
* (non-Javadoc)
- *
+ *
* @see org.apache.s4.processor.AsynchronousEventProcessor#getQueueSize()
*/
@Override
@@ -164,9 +163,9 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
/**
* An event is a control event if its stream name begins with the character
* '#'.
- *
+ *
* Control events are handled specially.
- *
+ *
* @param e
* the event wrapper to test
* @return true if and only if e is a control message.
@@ -218,68 +217,61 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
}
// printPlainPartitionInfoList(event.getCompoundKeyList());
- if (eventWrapper.getStreamName().endsWith("_checkpointing")
- || eventWrapper.getStreamName().endsWith("_recovery")) {
- // in that case, we don't need to iterate over all prototypes and advises:
- // the target PE is specified in the event
- handleCheckpointingOrRecovery(eventWrapper);
- } else {
-
- boolean ctrlEvent = testControlEvent(eventWrapper);
-
- // execute the PEs interested in this event
- for (int i = 0; i < prototypeWrappers.size(); i++) {
- if (logger.isDebugEnabled()) {
- logger.debug("STEP 6 (PEContainer): prototypeWrappers("
- + i
- + ") - "
- + prototypeWrappers.get(i).toString()
- + " - " + eventWrapper.getStreamName());
- }
-
- // first check if this is a control message and handle
- // it if
- // so.
- if (ctrlEvent) {
- if (controlEventProcessor != null) {
- controlEventProcessor.process(eventWrapper,
- prototypeWrappers.get(i));
- }
- continue;
- }
-
- // otherwise, continue processing event.
- List<EventAdvice> adviceList = adviceLists.get(i);
- for (EventAdvice eventAdvice : adviceList) {
- if (eventAdvice.getEventName().equals("*")
- || eventAdvice.getEventName().equals(
- eventWrapper.getStreamName())) {
- // event name matches
- } else {
- continue;
- }
-
- if (eventAdvice.getKey().equals("*")) {
- invokePE(prototypeWrappers.get(i).getPE("*"),
- eventWrapper, null);
- continue;
- }
-
- for (CompoundKeyInfo compoundKeyInfo : eventWrapper
- .getCompoundKeys()) {
- if (eventAdvice.getKey().equals(
- compoundKeyInfo.getCompoundKey())) {
- invokePE(
- prototypeWrappers
- .get(i)
- .getPE(compoundKeyInfo
- .getCompoundValue()),
- eventWrapper, compoundKeyInfo);
- }
- }
- }
- }
+ boolean ctrlEvent = testControlEvent(eventWrapper);
+
+ // execute the PEs interested in this event
+ for (int i = 0; i < prototypeWrappers.size(); i++) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("STEP 6 (PEContainer): prototypeWrappers("
+ + i
+ + ") - "
+ + prototypeWrappers.get(i).toString()
+ + " - " + eventWrapper.getStreamName());
+ }
+
+ // first check if this is a control message and handle
+ // it if
+ // so.
+ if (ctrlEvent) {
+ if (controlEventProcessor != null) {
+ controlEventProcessor.process(eventWrapper,
+ prototypeWrappers.get(i));
+ }
+
+ continue;
+ }
+
+ // otherwise, continue processing event.
+ List<EventAdvice> adviceList = adviceLists.get(i);
+ for (EventAdvice eventAdvice : adviceList) {
+ if (eventAdvice.getEventName().equals("*")
+ || eventAdvice.getEventName().equals(
+ eventWrapper.getStreamName())) {
+ // event name matches
+ } else {
+ continue;
+ }
+
+ if (eventAdvice.getKey().equals("*")) {
+ invokePE(prototypeWrappers.get(i).getPE("*"),
+ eventWrapper, null);
+ continue;
+ }
+
+ for (CompoundKeyInfo compoundKeyInfo : eventWrapper
+ .getCompoundKeys()) {
+ if (eventAdvice.getKey().equals(
+ compoundKeyInfo.getCompoundKey())) {
+ invokePE(
+ prototypeWrappers
+ .get(i)
+ .getPE(compoundKeyInfo
+ .getCompoundValue()),
+ eventWrapper, compoundKeyInfo);
+ }
+ }
+ }
}
endTime = System.currentTimeMillis();
@@ -299,40 +291,6 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
}
}
- private void handleCheckpointingOrRecovery(EventWrapper eventWrapper) {
- CheckpointingEvent checkpointingEvent = null;
- try {
- checkpointingEvent = (CheckpointingEvent) eventWrapper.getEvent();
- } catch (ClassCastException e) {
- logger.error("Checkpointing stream ["
- + eventWrapper.getStreamName()
- + "] can only handle checkpointing events. Received event is not a checkpointing event; it will be ignored.");
- return;
- }
- // 1. event is targeted towards PE prototype whose name is given by the
- // name of the stream
- // 2. PE id is given by the event
- for (int i = 0; i < prototypeWrappers.size(); i++) {
- if (checkpointingEvent.getSafeKeeperId().getPrototypeId()
- .equals(prototypeWrappers.get(i).getId())) {
-
- // check that PE is subscribed to checkpointing stream
- List<EventAdvice> advices = adviceLists.get(i);
- for (EventAdvice eventAdvice : advices) {
- if (eventAdvice.getEventName().equals(eventWrapper.getStreamName())){
- invokePE(
- prototypeWrappers.get(i).getPE(
- checkpointingEvent.getSafeKeeperId().getKey()),
- eventWrapper, null);
- break;
- }
- }
- }
- }
-
-
- }
-
private void invokePE(AbstractPE pe, EventWrapper eventWrapper,
CompoundKeyInfo compoundKeyInfo) {
try {