You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/09/12 16:36:37 UTC

[2/7] git commit: S4-44 cleaned up some leftovers

S4-44 cleaned up some leftovers

Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/a0574b51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/a0574b51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/a0574b51

Branch: refs/heads/dev-old
Commit: a0574b516d3936eb3c45284b91e04a121bb2700d
Parents: b6a90d0
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu Mar 1 17:26:27 2012 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Mar 1 17:26:27 2012 +0100

----------------------------------------------------------------------
 .../java/org/apache/s4/processor/PEContainer.java  |  160 ++++++---------
 1 files changed, 59 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a0574b51/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java b/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
index e50b24b..9b12ed7 100644
--- a/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
+++ b/s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
@@ -55,7 +55,6 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
     private int maxQueueSize = 1000;
     private boolean trackByKey;
     private Map<String, Integer> countByEventType = Collections.synchronizedMap(new HashMap<String, Integer>());
-	private SafeKeeper safeKeeper;
 
     private ControlEventProcessor controlEventProcessor = null;
 
@@ -80,7 +79,7 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
     }
 
 	public void setSafeKeeper(SafeKeeper sk) {
-		this.safeKeeper = sk;
+		// kept for backward compatibility with existing configuration files
 	}
 
     public void addProcessor(AbstractPE processor) {
@@ -119,7 +118,7 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
 
     /*
      * (non-Javadoc)
-     * 
+     *
      * @see
      * org.apache.s4.processor.AsynchronousEventProcessor#queueWork(org.apache.s4.collector.
      * EventWrapper)
@@ -153,7 +152,7 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
     // run()
     /*
      * (non-Javadoc)
-     * 
+     *
      * @see org.apache.s4.processor.AsynchronousEventProcessor#getQueueSize()
      */
     @Override
@@ -164,9 +163,9 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
     /**
      * An event is a control event if its stream name begins with the character
      * '#'.
-     * 
+     *
      * Control events are handled specially.
-     * 
+     *
      * @param e
      *            the event wrapper to test
      * @return true if and only if e is a control message.
@@ -218,68 +217,61 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
                 }
                 // printPlainPartitionInfoList(event.getCompoundKeyList());
 
-                if (eventWrapper.getStreamName().endsWith("_checkpointing")
-                        || eventWrapper.getStreamName().endsWith("_recovery")) {
-                    // in that case, we don't need to iterate over all prototypes and advises: 
-                    // the target PE is specified in the event
-                    handleCheckpointingOrRecovery(eventWrapper);
-                } else {
-
-                    boolean ctrlEvent = testControlEvent(eventWrapper);
-
-                    // execute the PEs interested in this event
-                    for (int i = 0; i < prototypeWrappers.size(); i++) {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("STEP 6 (PEContainer): prototypeWrappers("
-                                    + i
-                                    + ") - "
-                                    + prototypeWrappers.get(i).toString()
-                                    + " - " + eventWrapper.getStreamName());
-                        }
-
-                        // first check if this is a control message and handle
-                        // it if
-                        // so.
-                        if (ctrlEvent) {
-                            if (controlEventProcessor != null) {
-                                controlEventProcessor.process(eventWrapper,
-                                        prototypeWrappers.get(i));
-                            }
 
-                            continue;
-                        }
-
-                        // otherwise, continue processing event.
-                        List<EventAdvice> adviceList = adviceLists.get(i);
-                        for (EventAdvice eventAdvice : adviceList) {
-                            if (eventAdvice.getEventName().equals("*")
-                                    || eventAdvice.getEventName().equals(
-                                            eventWrapper.getStreamName())) {
-                                // event name matches
-                            } else {
-                                continue;
-                            }
-
-                            if (eventAdvice.getKey().equals("*")) {
-                                invokePE(prototypeWrappers.get(i).getPE("*"),
-                                        eventWrapper, null);
-                                continue;
-                            }
-                            
-                            for (CompoundKeyInfo compoundKeyInfo : eventWrapper
-                                    .getCompoundKeys()) {
-                                if (eventAdvice.getKey().equals(
-                                        compoundKeyInfo.getCompoundKey())) {
-                                    invokePE(
-                                            prototypeWrappers
-                                                    .get(i)
-                                                    .getPE(compoundKeyInfo
-                                                            .getCompoundValue()),
-                                            eventWrapper, compoundKeyInfo);
-                                }
-                            }
-                        }
-                    }
+                boolean ctrlEvent = testControlEvent(eventWrapper);
+
+                // execute the PEs interested in this event
+                for (int i = 0; i < prototypeWrappers.size(); i++) {
+                	if (logger.isDebugEnabled()) {
+                		logger.debug("STEP 6 (PEContainer): prototypeWrappers("
+                				+ i
+                				+ ") - "
+                				+ prototypeWrappers.get(i).toString()
+                				+ " - " + eventWrapper.getStreamName());
+                	}
+
+                	// first check if this is a control message and handle
+                	// it if
+                	// so.
+                	if (ctrlEvent) {
+                		if (controlEventProcessor != null) {
+                			controlEventProcessor.process(eventWrapper,
+                					prototypeWrappers.get(i));
+                		}
+
+                		continue;
+                	}
+
+                	// otherwise, continue processing event.
+                	List<EventAdvice> adviceList = adviceLists.get(i);
+                	for (EventAdvice eventAdvice : adviceList) {
+                		if (eventAdvice.getEventName().equals("*")
+                				|| eventAdvice.getEventName().equals(
+                						eventWrapper.getStreamName())) {
+                			// event name matches
+                		} else {
+                			continue;
+                		}
+
+                		if (eventAdvice.getKey().equals("*")) {
+                			invokePE(prototypeWrappers.get(i).getPE("*"),
+                					eventWrapper, null);
+                			continue;
+                		}
+
+                		for (CompoundKeyInfo compoundKeyInfo : eventWrapper
+                				.getCompoundKeys()) {
+                			if (eventAdvice.getKey().equals(
+                					compoundKeyInfo.getCompoundKey())) {
+                				invokePE(
+                						prototypeWrappers
+                						.get(i)
+                						.getPE(compoundKeyInfo
+                								.getCompoundValue()),
+                								eventWrapper, compoundKeyInfo);
+                			}
+                		}
+                	}
                 }
 
                 endTime = System.currentTimeMillis();
@@ -299,40 +291,6 @@ public class PEContainer implements Runnable, AsynchronousEventProcessor {
         }
     }
 
-    private void handleCheckpointingOrRecovery(EventWrapper eventWrapper) {
-        CheckpointingEvent checkpointingEvent = null;
-        try {
-            checkpointingEvent = (CheckpointingEvent) eventWrapper.getEvent();
-        } catch (ClassCastException e) {
-            logger.error("Checkpointing stream ["
-                    + eventWrapper.getStreamName()
-                    + "] can only handle checkpointing events. Received event is not a checkpointing event; it will be ignored.");
-            return;
-        }
-        // 1. event is targeted towards PE prototype whose name is given by the
-        // name of the stream
-        // 2. PE id is given by the event
-        for (int i = 0; i < prototypeWrappers.size(); i++) {
-            if (checkpointingEvent.getSafeKeeperId().getPrototypeId()
-                    .equals(prototypeWrappers.get(i).getId())) {
-                
-                // check that PE is subscribed to checkpointing stream
-                List<EventAdvice> advices = adviceLists.get(i);
-                for (EventAdvice eventAdvice : advices) {
-                    if (eventAdvice.getEventName().equals(eventWrapper.getStreamName())){
-                        invokePE(
-                                prototypeWrappers.get(i).getPE(
-                                        checkpointingEvent.getSafeKeeperId().getKey()),
-                                        eventWrapper, null);
-                        break;
-                    }
-                }
-            }
-        }
-        
-
-    }
-
     private void invokePE(AbstractPE pe, EventWrapper eventWrapper,
                           CompoundKeyInfo compoundKeyInfo) {
         try {