You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:16 UTC

[38/50] [abbrv] Rename packages in preparation for move to Apache

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java b/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java
new file mode 100644
index 0000000..3dfea86
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.persist;
+
+import org.apache.s4.util.clock.Clock;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+public class HashMapPersister implements Persister {
+    private volatile int persistCount = 0;
+    private boolean selfClean = false;
+    private int cleanWaitTime = 40; // 40 seconds by default
+    private String loggerName = "s4";
+    Map<String, CacheEntry> cache;
+    Clock s4Clock;
+
+    private int startCapacity = 5000;
+
+    public void setStartCapacity(int startCapacity) {
+        this.startCapacity = startCapacity;
+    }
+
+    public int getStartCapacity() {
+        return startCapacity;
+    }
+
+    public void setSelfClean(boolean selfClean) {
+        this.selfClean = selfClean;
+    }
+
+    public void setCleanWaitTime(int cleanWaitTime) {
+        this.cleanWaitTime = cleanWaitTime;
+    }
+
+    public void setLoggerName(String loggerName) {
+        this.loggerName = loggerName;
+    }
+
+    public HashMapPersister(Clock s4Clock) {
+        this.s4Clock = s4Clock;
+    }
+    
+    public void setS4Clock(Clock s4Clock) {
+        this.s4Clock = s4Clock;
+    }
+
+    public void init() {
+        cache = Collections.synchronizedMap(new HashMap<String, CacheEntry>(this.getStartCapacity()));
+
+        if (selfClean) {
+            Runnable r = new Runnable() {
+                public void run() {
+                    while (!Thread.interrupted()) {
+                        int cleanCount = HashMapPersister.this.cleanOutGarbage();
+                        Logger.getLogger(loggerName).info("Cleaned out "
+                                + cleanCount + " entries; Persister has "
+                                + cache.size() + " entries");
+                        try {
+                            Thread.sleep(cleanWaitTime * 1000);
+                        } catch (InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                        }
+                    }
+                }
+            };
+            Thread t = new Thread(r);
+            t.start();
+            t.setPriority(Thread.MIN_PRIORITY);
+        }
+    }
+
+    public int getQueueSize() {
+        return 0;
+    }
+
+    public int getPersistCount() {
+        return persistCount;
+    }
+
+    public int getCacheEntryCount() {
+        return cache.size();
+    }
+
+    public void setAsynch(String key, Object value, int period) {
+        // there really is no asynch for the local cache
+        set(key, value, period);
+    }
+
+    public void set(String key, Object value, int period) {
+        if (value == null) {
+            cache.remove(key);
+            return;
+        }
+        
+        synchronized (this) {
+            persistCount++;
+        }
+
+        CacheEntry ce = new CacheEntry();
+        ce.value = value;
+        ce.period = period;
+        ce.addTime = s4Clock.getCurrentTime();
+        cache.put(key, ce);
+    }
+
+    public Object get(String key) {
+        CacheEntry ce = cache.get(key);
+        if (ce == null) {
+            return null;
+        }
+
+        if (ce.isExpired()) {
+            return null;
+        }
+
+        return ce.value;
+    }
+
+    public Map<String, Object> getBulk(String[] keys) {
+        HashMap map = new HashMap<String, Object>();
+        for (String key : keys) {
+            Object value = get(key);
+            if (value != null) {
+                map.put(key, value);
+            }
+        }
+        return map;
+    }
+
+    public Object getObject(String key) {
+        return get(key);
+    }
+
+    public Map<String, Object> getBulkObjects(String[] keys) {
+        return getBulk(keys);
+    }
+
+    public void remove(String key) {
+        cache.remove(key);
+    }
+
+    public int cleanOutGarbage() {
+        int count = 0;
+        List<String> keyList;
+        synchronized (cache) {
+            keyList = new ArrayList<String>(cache.size());
+            for (String key : cache.keySet()) {
+                keyList.add(key);
+            }
+        }
+
+        for (String key : keyList) {
+            CacheEntry ce = cache.get(key);
+            if (ce != null && ce.isExpired()) {
+                count++;
+                cache.remove(key);
+            }
+        }
+        return count;
+    }
+
+    public Set<String> keySet() {
+        Set<String> keys = new HashSet<String>();
+        synchronized (cache) {
+            for (String key : cache.keySet()) {
+                keys.add(key);
+            }
+        }
+        return keys;
+    }
+
+    public class CacheEntry {
+        Object value;
+        long addTime;
+        int period;
+
+        public boolean isExpired() {
+            if (period > 0) {
+                if ((addTime + (1000 * (long) period)) <= s4Clock.getCurrentTime()) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/persist/Persister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/persist/Persister.java b/s4-core/src/main/java/org/apache/s4/persist/Persister.java
new file mode 100644
index 0000000..114043c
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/persist/Persister.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.persist;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Defines an interface to a collection of key/value pairs, each of which has a
+ * time-to-live property.
+ **/
+public interface Persister {
+    /**
+     * Returns the number of <code>setAsynch</code>-initiated operations that
+     * are pending or in-progress.
+     * 
+     * @return the number of pending set operations
+     **/
+    int getQueueSize();
+
+    /**
+     * Returns the number of <code>set</code> or <code>setAsynch</code> calls
+     * made to this object.
+     * 
+     * @return the number of <code>set</code> or <code>setAsynch</code> calls
+     *         made to this object.
+     **/
+    int getPersistCount();
+
+    /**
+     * Returns the number of key/value pairs in this Persister object.
+     * <p>
+     * If the underlying technology (e.g., Memcached) does not support this
+     * operation, then the implementation may return 0 or a
+     * <code>java.lang.UnsupportedOperationException</code>.
+     * <p>
+     * This number may include expired entries, depending on the implementation.
+     * Some implementations clean up expired entries only when the value is
+     * requested or at certain time intervals (or not at all).
+     * 
+     * @return the number of entries in this Persister
+     **/
+    int getCacheEntryCount();
+
+    /**
+     * Set a value for a key without waiting for the operation to complete.
+     * <p>
+     * This method is useful in the case where the underlying implementation
+     * uses a network but the caller should not be delayed by the network
+     * operation.
+     * <p>
+     * Because the set operation is not neccessarily complete when the method
+     * returns, a subsequent <code>get</code> call may return stale data.
+     * <p>
+     * <code>period</code> specifies the time-to-live for this key/value pair.
+     * For calls to <code>get</code> or <code>getObject</code> after the
+     * specified period, the Persister object will return null.
+     * 
+     * @param key
+     *            the key
+     * @param value
+     *            the value
+     * @param period
+     *            the maximum interval of time during which this key/value pair
+     *            is valid. Also know as time-to-live. -1 indicates an infinite
+     *            time-to-live.
+     **/
+    void setAsynch(String key, Object value, int period);
+
+    /**
+     * Set a value for a key.
+     * <p>
+     * <code>period</code> specifies the time-to-live for this key/value pair.
+     * For calls to <code>get</code> or <code>getObject</code> after the
+     * specified period, the Persister object will return null.
+     * 
+     * @param key
+     *            the key
+     * @param value
+     *            the value
+     * @param period
+     *            the maximum interval of time during which this key/value pair
+     *            is valid. Also know as time-to-live. -1 indicates an infinite
+     *            time-to-live.
+     **/
+    void set(String key, Object value, int period) throws InterruptedException;
+
+    /**
+     * Get the value associated with a specified key.
+     * 
+     * If the period (aka time-to-live) for this key/value pair is expired, the
+     * <code>get</code> will return null.
+     * 
+     * @param key
+     *            the key
+     **/
+    Object get(String key) throws InterruptedException;
+
+    /**
+     * Get the values associated with a list of keys.
+     * <p>
+     * <code>getBulk</code> returns a <code>Map</code> containing an entry for
+     * each key/value pair. The map contains an entry for a specified key only
+     * if that key exists in the Persister and the associated key/value pair is
+     * not yet expired.
+     * 
+     * @param keys
+     *            a list of keys
+     **/
+    Map<String, Object> getBulk(String[] keys) throws InterruptedException;
+
+    /**
+     * This is a method to help support some implementations whose underlying
+     * technology encodes the value associated with a key. In some cases, the
+     * value may have been stored by a client that uses a form of encoding not
+     * supported by the Persister's implementation. In that case, one might want
+     * the raw, yet-to-be-decoded value associated with the key.
+     * <p>
+     * <code>getObject</code> retrieves the raw, unencoded value associated with
+     * a key. In all other respects, it's the same as {@link Persister#get}.
+     * <p>
+     * It's likely you will never need to call this method.
+     * 
+     * If the period (aka time-to-live) for this key/value pair is expired, the
+     * <code>get</code> will return null.
+     **/
+    Object getObject(String key) throws InterruptedException;
+
+    /**
+     * As with {@link Persister#getObject}, this is a method to help support
+     * some implementations whose underlying technology encodes result.
+     * <p>
+     * It's likely you will never need to call this method.
+     * <p>
+     * In all other respects, it is the same as {@link Persister#getBulk}.
+     **/
+    Map<String, Object> getBulkObjects(String[] keys)
+            throws InterruptedException;
+
+    /**
+     * Removes the entry for the specified key from the Persister object.
+     * 
+     * @param key
+     *            the key of the entry to be removed
+     **/
+    void remove(String key) throws InterruptedException;
+
+    /**
+     * Forces the initiation of the process that cleans up expired entries.
+     * <p>
+     * Normally, you would not call this method. The implementation takes care
+     * of the timing of the cleanup process.
+     * <p>
+     * In some implementations, this method may be a no-op.
+     * 
+     **/
+    int cleanOutGarbage() throws InterruptedException;
+
+    /**
+     * Returns a <code>Set</code> of the keys contained in this Persister
+     * object.
+     * <p>
+     * The <code>Set</code> may contain keys for expired entries, depending on
+     * how often the underlying implementation cleans up expired entries.
+     * <p>
+     * The underlying technology may not support this operation. As a result,
+     * some implementations return an empty set.
+     * 
+     * @return a <code>Set</code> of the keys contained in this Persister
+     *         object.
+     **/
+    Set<String> keySet();
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java b/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
new file mode 100644
index 0000000..5852ec8
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
@@ -0,0 +1,778 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *            http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.processor;
+
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.dispatcher.partitioner.KeyInfo;
+import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElement;
+import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementIndex;
+import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementName;
+import org.apache.s4.ft.InitiateCheckpointingEvent;
+import org.apache.s4.ft.RecoveryEvent;
+import org.apache.s4.ft.SafeKeeper;
+import org.apache.s4.ft.SafeKeeperId;
+import org.apache.s4.persist.Persister;
+import org.apache.s4.schema.Schema;
+import org.apache.s4.schema.Schema.Property;
+import org.apache.s4.schema.SchemaContainer;
+import org.apache.s4.util.clock.Clock;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+
+/**
+ * This is the base class for processor classes.
+ * <p>
+ * <code>AbstractProcessor</code> provides output frequency strategies that
+ * allow you to configure the rate at which your processor produces output (see
+ * {@link AbstractPE#setOutputFrequencyByEventCount} and
+ * {@link AbstractPE#setOutputFrequencyByTimeBoundary}.
+ */
+public abstract class AbstractPE implements Cloneable {
+    public static enum FrequencyType {
+        TIMEBOUNDARY("timeboundary"), EVENTCOUNT("eventcount");
+
+        private String name;
+
+        FrequencyType(String name) {
+            this.name = name;
+        }
+
+        public String getName() {
+            return this.name;
+        }
+    }
+
+    public static enum PeriodicInvokerType {
+        OUTPUT, CHECKPOINTING;
+
+        public String getName() {
+            if (OUTPUT == this) {
+                return "PeriodicOutputInvoker";
+            } else {
+                return "PeriodicCheckpointingInvoker";
+            }
+        }
+    }
+
+    transient private Clock clock;
+    // FIXME replaces monitor wait on AbstractPE, for triggering possible extra
+    // thread when checkpointing activated
+    transient private CountDownLatch s4ClockSetSignal = new CountDownLatch(1);
+    transient private int outputFrequency = 1;
+    transient private FrequencyType outputFrequencyType = FrequencyType.EVENTCOUNT;
+    transient private int outputFrequencyOffset = 0;
+    transient private int eventCount = 0;
+    transient private int ttl = -1;
+    transient private Persister lookupTable;
+    transient private List<EventAdvice> eventAdviceList = new ArrayList<EventAdvice>();
+    transient private List<Object> keyValue;
+    transient private List<Object> keyRecord;
+    private String keyValueString;
+    transient private String streamName;
+    transient private boolean saveKeyRecord = false;
+    transient private int outputsBeforePause = -1;
+    transient private long pauseTimeInMillis;
+    transient private boolean logPauses = false;
+    private String id;
+    transient protected SchemaContainer schemaContainer = new SchemaContainer();
+    transient private PrototypeWrapper prototypeWrapper;
+
+    transient private boolean recoveryAttempted = false;
+    // true if state may have changed
+    transient private boolean checkpointable = false;
+    // use a flag for identifying checkpointing events
+    transient private boolean isCheckpointingEvent = false;
+
+    transient private SafeKeeper safeKeeper; // handles fault tolerance
+    transient private CountDownLatch safeKeeperSetSignal = new CountDownLatch(1);
+    transient private int checkpointingFrequency = 0;
+    transient private FrequencyType checkpointingFrequencyType = FrequencyType.EVENTCOUNT;
+    transient private int checkpointingFrequencyOffset = 0;
+    transient private int checkpointableEventCount = 0;
+    transient private int checkpointsBeforePause = -1;
+    transient private long checkpointingPauseTimeInMillis;
+
+    transient private OverloadDispatcher overloadDispatcher;
+
+    public void setSaveKeyRecord(boolean saveKeyRecord) {
+        this.saveKeyRecord = saveKeyRecord;
+    }
+
+    public void setOutputsBeforePause(int outputsBeforePause) {
+        this.outputsBeforePause = outputsBeforePause;
+    }
+
+    public void setCheckpointsBeforePause(int checkpointsBeforePause) {
+        this.checkpointsBeforePause = checkpointsBeforePause;
+    }
+
+    public void setPauseTimeInMillis(long pauseTimeInMillis) {
+        this.pauseTimeInMillis = pauseTimeInMillis;
+    }
+
+    public void setCheckpointingPauseTimeInMillis(long checkpointingPauseTimeInMillis) {
+        this.checkpointingPauseTimeInMillis = checkpointingPauseTimeInMillis;
+    }
+
+    public void setLogPauses(boolean logPauses) {
+        this.logPauses = logPauses;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public void setClock(Clock clock) {
+        this.clock = clock;
+        if (this.clock != null) {
+            this.s4ClockSetSignal.countDown();
+        }
+    }
+
+    /**
+     * This method will be called after the object is cloned from the prototype
+     * PE. The concrete PE class should override this if it has any special
+     * set-up requirements.
+     */
+    public void initInstance() {
+        // default implementation does nothing.
+    }
+
+    public Clock getClock() {
+        return clock;
+    }
+
+    public void setPrototypeWrapper(PrototypeWrapper prototypeWrapper) {
+        this.prototypeWrapper = prototypeWrapper;
+    }
+
+    public AbstractPE() {
+        OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(this.getClass());
+        Class<?> overloadDispatcherClass = oldg.generate();
+
+        try {
+            overloadDispatcher = (OverloadDispatcher) overloadDispatcherClass.newInstance();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * You should not override this method. Instead, you need to implement the
+     * <code>processEvent</code> method.
+     **/
+    public void execute(String streamName, CompoundKeyInfo compoundKeyInfo, Object event) {
+        // if this is the first time through, get the key for this PE
+        if (keyValue == null || saveKeyRecord) {
+            setKeyValue(event, compoundKeyInfo);
+
+            if (compoundKeyInfo != null)
+                keyValueString = compoundKeyInfo.getCompoundValue();
+        }
+
+        this.streamName = streamName;
+
+        if (safeKeeper != null) {
+            // initialize checkpointing event flag
+            this.isCheckpointingEvent = false;
+            if (!recoveryAttempted) {
+                recover();
+                recoveryAttempted = true;
+            }
+        }
+
+        overloadDispatcher.dispatch(this, event);
+
+        if (saveKeyRecord) {
+            keyRecord.clear(); // the PE doesn't need it anymore
+        }
+
+        if (outputFrequencyType == FrequencyType.EVENTCOUNT && outputFrequency > 0 && !isCheckpointingEvent) {
+            eventCount++;
+            if (eventCount % outputFrequency == 0) {
+                try {
+                    output();
+                } catch (Exception e) {
+                    Logger.getLogger("s4").error("Exception calling output() method in execute()", e);
+                }
+            }
+        }
+
+        // do not take into account checkpointing/recovery trigger messages
+        if (!isCheckpointingEvent) {
+            checkpointable = true; // dirty flag
+            if (checkpointingFrequencyType == FrequencyType.EVENTCOUNT && checkpointingFrequency > 0) {
+                checkpointableEventCount++;
+                if (checkpointableEventCount % checkpointingFrequency == 0) {
+                    // for count-based frequency, we directly checkpoint here
+                    checkpoint();
+                }
+            }
+
+        }
+    }
+
+    public long getCurrentTime() {
+        return clock.getCurrentTime();
+    }
+
+    /**
+     * This method returns the key value associated with this PE.
+     * <p>
+     * The key value is a list because the key may be a compound (composite)
+     * key, in which case the key will have one value for each simple key.
+     * 
+     * @return the key value as a List of Objects (each element contains the
+     *         value of a simple key).
+     **/
+    public List<Object> getKeyValue() {
+        return keyValue;
+    }
+
+    public List<Object> getKeyRecord() {
+        return keyRecord;
+    }
+
+    public String getKeyValueString() {
+        return keyValueString;
+    }
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    private void setKeyValue(Object event, CompoundKeyInfo compoundKeyInfo) {
+        if (compoundKeyInfo == null) {
+            return;
+        }
+
+        keyValue = new ArrayList<Object>();
+
+        Schema schema = schemaContainer.getSchema(event.getClass());
+
+        // get the value for each keyInfo
+        for (KeyInfo keyInfo : compoundKeyInfo.getKeyInfoList()) {
+            Object value = null;
+            Object record = event;
+            List<?> list = null;
+            Property property = null;
+            for (KeyPathElement keyPathElement : keyInfo.getKeyPath()) {
+                if (keyPathElement instanceof KeyPathElementIndex) {
+                    record = list.get(((KeyPathElementIndex) keyPathElement).getIndex());
+                    schema = property.getComponentProperty().getSchema();
+                } else {
+                    String keyPathElementName = ((KeyPathElementName) keyPathElement).getKeyName();
+                    property = schema.getProperties().get(keyPathElementName);
+                    value = null;
+                    try {
+                        value = property.getGetterMethod().invoke(record);
+                    } catch (Exception e) {
+                        Logger.getLogger("s4").error(e);
+                        return;
+                    }
+
+                    if (value == null) {
+                        Logger.getLogger("s4").error("Value for " + keyPathElementName + " is null!");
+                        return;
+                    }
+
+                    if (property.getType().isPrimitive() || property.isNumber()
+                            || property.getType().equals(String.class)) {
+                        keyValue.add(value);
+                        if (saveKeyRecord) {
+                            if (keyRecord == null) {
+                                keyRecord = new ArrayList<Object>();
+                            }
+                            keyRecord.add(record);
+                        }
+                        continue;
+                    } else if (property.isList()) {
+                        try {
+                            list = (List) property.getGetterMethod().invoke(record);
+                        } catch (Exception e) {
+                            Logger.getLogger("s4").error(e);
+                            return;
+                        }
+                    } else {
+                        try {
+                            record = property.getGetterMethod().invoke(record);
+                        } catch (Exception e) {
+                            Logger.getLogger("s4").error(e);
+                            return;
+                        }
+                        schema = property.getSchema();
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * This method sets the output strategy to "by event count" and specifies
+     * how many events trigger a call to the <code>output</code> method.
+     * <p>
+     * You would not normally call this method directly, but instead via the S4
+     * configuration file.
+     * <p>
+     * After this method is called, AbstractProcessor will call your
+     * <code>output</code> method (implemented in your subclass) every
+     * <emp>outputFrequency</emph> events.
+     * <p>
+     * If you call neither <code>setOutputFrequencyByEventCount</code> nor
+     * <code>setOutputFrequencyByTimeBoundary</code>, the default strategy is
+     * "by event count" with an output frequency of 1. (That is,
+     * <code>output</code> is called after after each return from
+     * <code>processEvent</code>).
+     * 
+     * @param outputFrequency
+     *            the number of application events passed to
+     *            <code>processEvent</code> before output is called.
+     **/
+    public void setOutputFrequencyByEventCount(int outputFrequency) {
+        this.outputFrequency = outputFrequency;
+        this.outputFrequencyType = FrequencyType.EVENTCOUNT;
+        initFrequency(PeriodicInvokerType.OUTPUT);
+    }
+
+    /**
+     * Sets the frequency strategy to "by event count". Uses the same mechanism
+     * than {@link #setOutputFrequencyByEventCount(int)}
+     * 
+     * @param checkpointingFrequency
+     *            the number of application events passed to
+     *            <code>processEvent</code> before output is called (ignoring
+     *            checkpointing events).
+     */
+    public void setCheckpointingFrequencyByEventCount(int checkpointingFrequency) {
+        this.checkpointingFrequency = checkpointingFrequency;
+        this.checkpointingFrequencyType = FrequencyType.EVENTCOUNT;
+        supplementAdviceForCheckpointingAndRecovery();
+    }
+
+    /**
+     * This method sets the output strategy to "output on time boundary" and
+     * specifies the time boundary on which the <code>output</code> should be
+     * called.
+     * <p>
+     * You would not normally call this method directly, but instead via the S4
+     * configuration file.
+     * <p>
+     * <code>outputFrequency</code> specifies the time boundary in seconds.
+     * Whenever the current time is a multiple of <code>outputFrequency</code>,
+     * <code>AbstractProcessor</code> will call your <code>output</code> method.
+     * For example, if you specify an <code>outputFrequency</code> of 3600,
+     * <code>AbstractProcessor</code> will call <code>output</code> on every
+     * hour boundary (e.g., 11:00:00, 12:00:00, 13:00:00, etc.).
+     * <p>
+     * When this output strategy is used, your <code>output</code> method may
+     * occasionally (or frequently) run concurrently with your
+     * <code>processEvent</code> method. Therefore, you should take steps to
+     * protect any data structures that both methods use.
+     * <p>
+     * If you call neither <code>setOutputFrequencyByEventCount</code> nor
+     * <code>setOutputFrequencyByTimeBoundary</code>, the default strategy is
+     * "by event count" with an output frequency of 1. (That is,
+     * <code>output</code> is called after after each return from
+     * <code>processEvent</code>).
+     * 
+     * @param outputFrequency
+     *            the time boundary in seconds
+     **/
+    public void setOutputFrequencyByTimeBoundary(int outputFrequency) {
+        this.outputFrequency = outputFrequency;
+        this.outputFrequencyType = FrequencyType.TIMEBOUNDARY;
+        initFrequency(PeriodicInvokerType.OUTPUT);
+    }
+
+    /**
+     * Sets the frequency of checkpointing. It uses the same mechanism than
+     * {@link #setOutputFrequencyByTimeBoundary(int)}
+     * 
+     * @param checkpointingFrequency
+     *            the time boundary in seconds
+     */
+    public void setCheckpointingFrequencyByTimeBoundary(int checkpointingFrequency) {
+        this.checkpointingFrequency = checkpointingFrequency;
+        this.checkpointingFrequencyType = FrequencyType.TIMEBOUNDARY;
+        supplementAdviceForCheckpointingAndRecovery();
+        initFrequency(PeriodicInvokerType.CHECKPOINTING);
+    }
+
+    /**
+     * Set the offset from the time boundary at which
+     * <code>AbstractProcessor</code> should call <code>output</code>.
+     * <p>
+     * This value is honored only if the "output on time boundary" output
+     * strategy is used.
+     * <p>
+     * As an example, if you specify an <code>outputFrequency</code> of 3600 and
+     * an <code>outputFrequencyOffset</code> of 7,
+     * <code>AbstractProcessor</code> will call <code>output</code> on every
+     * hour boundary plus 7 seconds (e.g., 11:00:07, 12:00:07, 13:00:07, etc.).
+     **/
+    public void setOutputFrequencyOffset(int outputFrequencyOffset) {
+        this.outputFrequencyOffset = outputFrequencyOffset;
+    }
+
+    /**
+     * Set the offset from the time boundary at which calls to checkpoint should
+     * be performed. It uses the same mechanism than
+     * {@link AbstractPE#setOutputFrequencyOffset(int)}
+     * 
+     * @param checkpointingFrequencyOffset
+     *            checkpointing frequency offset in seconds
+     */
+    public void setCheckpointingFrequencyOffset(int checkpointingFrequencyOffset) {
+        this.checkpointingFrequencyOffset = checkpointingFrequencyOffset;
+        supplementAdviceForCheckpointingAndRecovery();
+    }
+
+    public void setKeys(String[] keys) {
+        for (String key : keys) {
+            StringTokenizer st = new StringTokenizer(key);
+            eventAdviceList.add(new EventAdvice(st.nextToken(), st.nextToken()));
+        }
+        supplementAdviceForCheckpointingAndRecovery();
+    }
+
+    private void initFrequency(PeriodicInvokerType type) {
+        Runnable r = null;
+        if (PeriodicInvokerType.OUTPUT.equals(type)) {
+            if (outputFrequency < 0) {
+                return;
+            }
+
+            if (outputFrequencyType == FrequencyType.TIMEBOUNDARY) {
+                // create a thread that calls output on time boundaries
+                // that are multiples of frequency
+                r = new PeriodicInvoker(type);
+
+            }
+        } else {
+            if (checkpointingFrequency < 0) {
+                return;
+            }
+            if (checkpointingFrequencyType == FrequencyType.TIMEBOUNDARY) {
+                r = new PeriodicInvoker(type);
+            }
+        }
+        if (r != null) {
+            Thread t = new Thread(r, type.getName());
+            t.start();
+        }
+    }
+
+    /**
+     * This method exists simply to make <code>clone()</code> public.
+     */
+    public Object clone() {
+        try {
+            Object clone = super.clone();
+            return clone;
+        } catch (CloneNotSupportedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void setTtl(int ttl) {
+        this.ttl = ttl;
+    }
+
+    /**
+     * 
+     */
+    public int getTtl() {
+        return ttl;
+    }
+
+    public List<EventAdvice> advise() {
+        return eventAdviceList;
+    }
+
+    /**
+     * 
+     */
+    public void setLookupTable(Persister lookupTable) {
+        this.lookupTable = lookupTable;
+    }
+
+    /**
+     * You implement this abstract method in your subclass. This is the part of
+     * your processor that outputs data (e.g., by writing the data to the
+     * cache). The <code>output</code> method may further process the data
+     * (e.g., aggregate it) before outputting it.
+     **/
+    abstract public void output();
+
+    protected void checkpoint() {
+
+        byte[] serializedState = serializeState();
+        // NOTE: assumes pe id is keyvalue from the PE...
+        saveState(getSafeKeeperId(), serializedState);
+        // remove dirty flag
+        checkpointable = false;
+    }
+
+    private void saveState(SafeKeeperId key, byte[] serializedState) {
+        safeKeeper.saveState(key, serializedState);
+    }
+
+    protected void recover() {
+        byte[] serializedState = null;
+        try {
+            serializedState = safeKeeper.fetchSerializedState(getSafeKeeperId());
+        } catch (RuntimeException e) {
+            Logger.getLogger("s4-ft").error("Cannot fetch serialized stated for key [" + getSafeKeeperId().toString()+"]: "+e.getMessage(), e);
+        }
+        if (serializedState == null) {
+            return;
+        }
+        try {
+            AbstractPE peInOldState = deserializeState(serializedState);
+            restoreState(peInOldState);
+        } catch (RuntimeException e) {
+            Logger.getLogger("s4-ft").error("Cannot restore state for key [" + getSafeKeeperId().toString()+"]: "+e.getMessage(), e);
+        }
+    }
+
+    public SafeKeeperId getSafeKeeperId() {
+        return new SafeKeeperId(getId(), getKeyValueString());
+    }
+
+    public void setSafeKeeper(SafeKeeper safeKeeper) {
+        this.safeKeeper = safeKeeper;
+        if (safeKeeper != null) {
+            this.safeKeeperSetSignal.countDown();
+        }
+    }
+
+    public final void processEvent(InitiateCheckpointingEvent checkpointingEvent) {
+        isCheckpointingEvent = true;
+        if (isCheckpointable()) {
+            checkpoint();
+        }
+    }
+
+    protected boolean isCheckpointable() {
+        return checkpointable;
+    }
+
+    protected void setCheckpointable(boolean checkpointable) {
+        this.checkpointable = checkpointable;
+    }
+
+    public final void initiateCheckpoint() {
+        // enqueue checkpointing event
+        if (safeKeeper != null) {
+            safeKeeper.generateCheckpoint(this);
+        }
+    }
+
+    public byte[] serializeState() {
+        return safeKeeper.getSerializer().serialize(this);
+    }
+
+    public AbstractPE deserializeState(byte[] loadedState) {
+        return (AbstractPE) safeKeeper.getSerializer().deserialize(loadedState);
+    }
+
+    public void restoreState(AbstractPE oldState) {
+        restoreFieldsForClass(oldState.getClass(), oldState);
+    }
+
+    private void restoreFieldsForClass(Class currentInOldStateClassHierarchy, AbstractPE oldState) {
+        if (!AbstractPE.class.isAssignableFrom(currentInOldStateClassHierarchy)) {
+            return;
+        } else {
+            Field[] fields = oldState.getClass().getDeclaredFields();
+            for (Field field : fields) {
+                if (!Modifier.isTransient(field.getModifiers()) && !Modifier.isStatic(field.getModifiers())) {
+                    if (!Modifier.isPublic(field.getModifiers())) {
+                        field.setAccessible(true);
+                    }
+                    try {
+                        // TODO use reflectasm
+                        field.set(this, field.get(oldState));
+                    } catch (IllegalArgumentException e) {
+                        Logger.getLogger("s4-ft").error("Cannot recover old state for this PE [" + this + "]", e);
+                        return;
+                    } catch (IllegalAccessException e) {
+                        Logger.getLogger("s4-ft").error("Cannot recover old state for this PE [" + this + "]", e);
+                        return;
+                    }
+
+                }
+            }
+            restoreFieldsForClass(currentInOldStateClassHierarchy.getSuperclass(), oldState);
+        }
+    }
+
+    /**
+     * Subscribes this PE to the checkpointing stream
+     */
+    private void supplementAdviceForCheckpointingAndRecovery() {
+        // don't do anything until both conditions are true
+        Logger.getLogger("s4").info(
+                "Maybe adding for " + this.getId() + ": " + checkpointingFrequency + " and " + eventAdviceList.size());
+        if (checkpointingFrequency > 0 && eventAdviceList.size() > 0) {
+            eventAdviceList.add(new EventAdvice(this.getId() + "_checkpointing", "key"));
+        }
+    }
+
+    public void processEvent(RecoveryEvent recoveryEvent) {
+        isCheckpointingEvent = true;
+        recover();
+    }
+    
+    /**
+     * This method expires the current PE.
+     **/
+    protected void expire() {
+        this.prototypeWrapper.expire(this.keyValueString);
+    }
+
+    class PeriodicInvoker implements Runnable {
+
+        PeriodicInvokerType type;
+
+        public PeriodicInvoker(PeriodicInvokerType type) {
+            this.type = type;
+        }
+
+        public long getFrequencyInMillis() {
+            if (type.equals(PeriodicInvokerType.OUTPUT)) {
+                return outputFrequency * 1000;
+            } else {
+                return checkpointingFrequency * 1000;
+            }
+        }
+
+        public long getFrequencyOffset() {
+            if (type.equals(PeriodicInvokerType.OUTPUT)) {
+                return outputFrequencyOffset;
+            } else {
+                return checkpointingFrequencyOffset;
+            }
+        }
+
+        public void run() {
+            if (clock == null) {
+                try {
+                    s4ClockSetSignal.await();
+                } catch (InterruptedException e) {
+                }
+            }
+            if (PeriodicInvokerType.CHECKPOINTING.equals(type) && safeKeeper == null) {
+                try {
+                    safeKeeperSetSignal.await();
+                } catch (InterruptedException e) {
+                }
+            }
+
+            int outputCount = 0;
+            int checkpointCount = 0;
+            long frequencyInMillis = getFrequencyInMillis();
+
+            long currentTime = getCurrentTime();
+            while (!Thread.interrupted()) {
+                long currentBoundary = (currentTime / frequencyInMillis) * frequencyInMillis;
+                long nextBoundary = currentBoundary + frequencyInMillis;
+                currentTime = clock.waitForTime(nextBoundary + (outputFrequencyOffset * 1000));
+                if (lookupTable != null) {
+                    Set peKeys = lookupTable.keySet();
+                    for (Iterator it = peKeys.iterator(); it.hasNext();) {
+                        String peKey = (String) it.next();
+                        AbstractPE pe = null;
+                        try {
+                            pe = (AbstractPE) lookupTable.get(peKey);
+                        } catch (InterruptedException ie) {
+                        }
+
+                        if (pe == null) {
+                            continue;
+                        }
+
+                        if (PeriodicInvokerType.OUTPUT.equals(type)) {
+                            try {
+                                pe.output();
+                                outputCount++;
+                            } catch (Exception e) {
+                                Logger.getLogger("s4").error("Exception calling output() method", e);
+                            }
+
+                            if (outputCount == outputsBeforePause) {
+                                if (logPauses) {
+                                    Logger.getLogger("s4").info(
+                                            "Pausing " + getId() + " at count " + outputCount + " for "
+                                                    + pauseTimeInMillis + " milliseconds");
+                                }
+                                outputCount = 0;
+                                try {
+                                    Thread.sleep(pauseTimeInMillis);
+                                } catch (InterruptedException ie) {
+                                    Thread.currentThread().interrupt();
+                                }
+                            }
+                        } else if (PeriodicInvokerType.CHECKPOINTING.equals(type)) {
+                            try {
+                                if (pe.isCheckpointable()) {
+                                    pe.initiateCheckpoint();
+                                    checkpointCount++;
+                                }
+                            } catch (Exception e) {
+                                e.printStackTrace();
+                                Logger.getLogger("s4").error("Exception calling checkpoint() method", e);
+                            }
+
+                            if (checkpointCount == checkpointsBeforePause) {
+                                if (logPauses) {
+                                    Logger.getLogger("s4").info(
+                                            "Pausing " + getId() + " at checkpoint count " + checkpointCount + " for "
+                                                    + checkpointingPauseTimeInMillis + " milliseconds");
+                                }
+                                checkpointCount = 0;
+                                try {
+                                    Thread.sleep(checkpointingPauseTimeInMillis);
+                                } catch (InterruptedException ie) {
+                                    Thread.currentThread().interrupt();
+                                }
+                            }
+                        } // end for each pe in lookup table
+                    } // end if lookup table is not null
+                }
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/AbstractWindowingPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/AbstractWindowingPE.java b/s4-core/src/main/java/org/apache/s4/processor/AbstractWindowingPE.java
new file mode 100644
index 0000000..18f19e1
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/AbstractWindowingPE.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.processor;
+
+import org.apache.s4.schema.Schema;
+import org.apache.s4.schema.Schema.Property;
+import org.apache.s4.util.SlotUtils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+public abstract class AbstractWindowingPE extends AbstractPE {
+    private String slotClassName;
+    private int slotSize = 3600; // default one hour
+    private int windowSize = slotSize * 24; // default, 24 hours
+    private Map<String, String> timestampFields;
+
+    private long lastTimestamp = -1;
+    private Map<Long, Slot> slots;
+    private SlotUtils slotUtils;
+    private Class slotClass;
+
+    public void setSlotClassName(String slotClassName) {
+        this.slotClassName = slotClassName;
+    }
+
+    public void setSlotSize(int slotSize) {
+        this.slotSize = slotSize;
+    }
+
+    public int getSlotSize() {
+        return slotSize;
+    }
+
+    public void setWindowSize(int windowSize) {
+        this.windowSize = windowSize;
+    }
+    
+    public int getWindowSize() {
+        return windowSize;
+    }
+
+    public void setTimestampFields(String[] timestampFieldsArray) {
+        timestampFields = new HashMap<String, String>();
+        for (String timeStampFieldInfo : timestampFieldsArray) {
+            StringTokenizer st = new StringTokenizer(timeStampFieldInfo);
+            timestampFields.put(st.nextToken(), st.nextToken());
+        }
+    }
+
+    private OverloadDispatcherSlot overloadDispatcher;
+
+    public AbstractWindowingPE() {
+    }
+
+    public void init() {
+        // this reference will be shared amongst all instances of the pe
+        slotUtils = new SlotUtils(slotSize);
+
+        try {
+            slotClass = Class.forName(slotClassName);
+        } catch (ClassNotFoundException cnfe) {
+            throw new RuntimeException(cnfe);
+        }
+
+        OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(slotClass,
+                                                                           true);
+        Class<?> overloadDispatcherClass = oldg.generate();
+
+        try {
+            overloadDispatcher = (OverloadDispatcherSlot) overloadDispatcherClass.newInstance();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void processEvent(Object event) {
+        long currentTime = getCurrentTime();
+        long maybeCurrentTime = -1;
+        if (timestampFields != null) {
+            Schema schema = schemaContainer.getSchema(event.getClass());
+            String fieldName = timestampFields.get(getStreamName());
+            if (fieldName != null) {
+                Property property = schema.getProperties().get(fieldName);
+                if (property != null
+                        && (property.getType().equals(Long.TYPE) || property.getType()
+                                                                            .equals(Long.class))) {
+                    try {
+                        maybeCurrentTime = (Long) property.getGetterMethod()
+                                                          .invoke(event);
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        }
+
+        if (maybeCurrentTime > -1) {
+            currentTime = maybeCurrentTime;
+            lastTimestamp = currentTime;
+        }
+
+        long slotTime = slotUtils.getSlotAtTime(currentTime / 1000); // convert
+                                                                     // to
+                                                                     // seconds
+
+        if (slots == null) {
+            slots = Collections.synchronizedMap(new HashMap<Long, Slot>());
+        }
+
+        Slot slot = slots.get(slotTime);
+        if (slot == null) {
+            try {
+                slot = (Slot) slotClass.newInstance();
+            } catch (IllegalAccessException iae) {
+                throw new RuntimeException(iae);
+            } catch (InstantiationException ie) {
+                throw new RuntimeException(ie);
+            }
+            slots.put(slotTime, slot);
+        }
+
+        overloadDispatcher.dispatch(slot, event, slotTime, this);
+    }
+
+    public Map<Long, Slot> getSlots() {
+        pruneSlots(getCurrentTime() / 1000);
+
+        return Collections.unmodifiableMap(slots);
+    }
+
+    private void pruneSlots(long time) {
+        HashSet<Long> keys = new HashSet<Long>();
+
+        synchronized (slots) {
+            for (Long key : slots.keySet()) {
+                keys.add(key);
+            }
+        }
+
+        for (Long key : keys) {
+            if (slotUtils.isOutsideWindow(key, windowSize, time)) {
+                slots.remove(key);
+            }
+        }
+    }
+
+    public boolean isCurrentSlot(long slotTime) {
+        long currentSlot = slotUtils.getSlotAtTime(getCurrentTime() / 1000);
+        if (currentSlot == slotTime) {
+            return true;
+        }
+        return false;
+    }
+    
+    public boolean isOutsideWindow(long time) {
+        Long slotIndexAtTime = slotUtils.getSlotAtTime(time/1000);
+        return slotUtils.isOutsideWindow(slotIndexAtTime, windowSize, getCurrentTime()/1000);
+    }
+
+    public Long getSlotAtOffset(int offset) {
+        return slotUtils.getSlot(offset, getCurrentTime() / 1000);
+    }
+    
+    public Slot getSlotAtTime(long time) {
+        pruneSlots(getCurrentTime()/1000);
+        Long slotIndex = slotUtils.getSlotAtTime(time/1000);
+        return slots.get(slotIndex);
+    }
+    
+    public Long getSlotTimeForTime(long time) {
+        return slotUtils.getSlotAtTime(time/1000);
+    }
+
+    public static interface Slot {
+        // public void processEvent(Object event, long slotTime,
+        // AbstractWindowingPE pe);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/AsynchronousEventProcessor.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/AsynchronousEventProcessor.java b/s4-core/src/main/java/org/apache/s4/processor/AsynchronousEventProcessor.java
new file mode 100644
index 0000000..ce8dad5
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/AsynchronousEventProcessor.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.processor;
+
+import org.apache.s4.collector.EventWrapper;
+
+public interface AsynchronousEventProcessor {
+
+    void queueWork(EventWrapper eventWrapper);
+
+    // This will always be called by a different thread than the one executing
+    // run()
+    int getQueueSize();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/ControlEventProcessor.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/ControlEventProcessor.java b/s4-core/src/main/java/org/apache/s4/processor/ControlEventProcessor.java
new file mode 100644
index 0000000..077c693
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/ControlEventProcessor.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.processor;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.dispatcher.EventDispatcher;
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.message.PrototypeRequest;
+import org.apache.s4.message.Response;
+import org.apache.s4.message.SinglePERequest;
+
+import java.util.List;
+
+/**
+ * Processes control events.
+ */
+public class ControlEventProcessor {
+
+    private EventDispatcher dispatcher;
+
+    public void setDispatcher(EventDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public void process(EventWrapper e, PrototypeWrapper p) {
+        String en = e.getStreamName(); // e.g. "#joinPe01"
+        String pn = p.getId(); // e.g. "JoinPE01"
+
+        // stream name has to match PE's ID (modulo case).
+        // e.g. "#joinPe01" will match "JoinPE01"
+        if (!en.regionMatches(true, 1, pn, 0, pn.length()))
+            return;
+
+        execute(e, p);
+    }
+
+    protected void execute(EventWrapper e, PrototypeWrapper p) {
+        List<CompoundKeyInfo> keyInfoList = e.getCompoundKeys();
+        Object event = e.getEvent();
+        
+        if (event instanceof SinglePERequest) {
+            // Handle Requests to individual PEs
+            if (keyInfoList.isEmpty())
+                return;
+
+            CompoundKeyInfo keyInfo = keyInfoList.get(0);
+
+            String keyVal = keyInfo.getCompoundValue();
+
+            AbstractPE pe = p.lookupPE(keyVal);
+
+            Response response = ((SinglePERequest) event).evaluate(pe);
+            String stream = response.getRInfo().getStream();
+
+            dispatcher.dispatchEvent(stream, response);
+
+        } else if (event instanceof PrototypeRequest) {
+            // Or handle aggregate requests to Prototypes.
+            Response response = ((PrototypeRequest) event).evaluate(p);
+            String stream = response.getRInfo().getStream();
+
+            dispatcher.dispatchEvent(stream, response);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/EventAdvice.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/EventAdvice.java b/s4-core/src/main/java/org/apache/s4/processor/EventAdvice.java
new file mode 100644
index 0000000..51f318f
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/EventAdvice.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.processor;
+
+public class EventAdvice {
+    String eventName;
+    String key;
+
+    public EventAdvice(String eventName, String key) {
+        this.eventName = eventName;
+        this.key = key;
+    }
+
+    public String toString() {
+        return eventName + ":{" + key + "}";
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public String getEventName() {
+        return eventName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/JoinPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/JoinPE.java b/s4-core/src/main/java/org/apache/s4/processor/JoinPE.java
new file mode 100644
index 0000000..4386699
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/JoinPE.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.processor;
+
+import org.apache.s4.dispatcher.EventDispatcher;
+import org.apache.s4.logger.Monitor;
+import org.apache.s4.schema.Schema;
+import org.apache.s4.schema.Schema.Property;
+import org.apache.s4.schema.SchemaContainer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.log4j.Logger;
+
+public class JoinPE extends AbstractPE {
+    private static Logger logger = Logger.getLogger(JoinPE.class);
+    private Map<String, List<String>> eventFields = new HashMap<String, List<String>>();
+    private Map<String, Object> eventsToJoin;
+    private EventDispatcher dispatcher;
+    private Monitor monitor;
+    private String outputStreamName;
+    private String outputClassName;
+    private Class<?> outputClass;
+
+    public void setDispatcher(EventDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public void setMonitor(Monitor monitor) {
+        this.monitor = monitor;
+    }
+
+    public void setOutputStreamName(String outputStreamName) {
+        this.outputStreamName = outputStreamName;
+    }
+
+    public String getOutputClassName() {
+        return outputClassName;
+    }
+
+    public void setOutputClassName(String outputClassName) {
+        this.outputClassName = outputClassName;
+        try {
+            this.outputClass = Class.forName(this.outputClassName);
+        } catch (ClassNotFoundException cnfe) {
+            throw new RuntimeException(cnfe);
+        }
+    }
+
+    public void setIncludeFields(String[] includeFields) {
+        for (String includeField : includeFields) {
+            StringTokenizer st = new StringTokenizer(includeField);
+            if (st.countTokens() != 2) {
+                Logger.getLogger("s4").error("Bad include field specified: "
+                        + includeField);
+                continue;
+            }
+
+            String eventName = st.nextToken();
+            String fieldName = st.nextToken();
+
+            List<String> fieldNames = eventFields.get(eventName);
+            if (fieldNames == null) {
+                fieldNames = new ArrayList<String>();
+                eventFields.put(eventName, fieldNames);
+            }
+
+            if (fieldName.equals("*")) {
+                fieldNames.clear();
+                fieldNames.add("*");
+            } else {
+                fieldNames.add(fieldName);
+            }
+        }
+    }
+
+    @Override
+    public void output() {
+        // TODO Auto-generated method stub
+
+    }
+
+    private SchemaContainer schemaContainer = new SchemaContainer();
+
+    public void processEvent(Object event) {
+        if (eventsToJoin == null) {
+            eventsToJoin = new HashMap<String, Object>();
+        }
+        List<String> fieldNames = eventFields.get(getStreamName());
+        if (fieldNames == null) {
+            return;
+        }
+
+        // we only use the last event that comes through on the given stream
+        eventsToJoin.put(getStreamName(), event);
+
+        if (eventsToJoin.keySet().size() == eventFields.keySet().size()) {
+            Object newEvent = null;
+            try {
+                newEvent = outputClass.newInstance();
+            } catch (Exception e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+            }
+
+            Schema newEventSchema = schemaContainer.getSchema(newEvent.getClass());
+
+            for (String streamName : eventsToJoin.keySet()) {
+                Object partialEvent = eventsToJoin.get(streamName);
+                Schema partialEventSchema = schemaContainer.getSchema(partialEvent.getClass());
+
+                List<String> includeFields = eventFields.get(streamName);
+                if (includeFields.size() == 1
+                        && includeFields.get(0).equals("*")) {
+                    for (Property partialEventProperty : partialEventSchema.getProperties()
+                                                                           .values()) {
+                        copyField(partialEventProperty.getName(),
+                                  partialEventSchema,
+                                  newEventSchema,
+                                  partialEvent,
+                                  newEvent);
+                    }
+                } else {
+                    for (String includeField : includeFields) {
+                        copyField(includeField,
+                                  partialEventSchema,
+                                  newEventSchema,
+                                  partialEvent,
+                                  newEvent);
+                    }
+                }
+            }
+
+            dispatcher.dispatchEvent(outputStreamName, newEvent);
+            if (logger.isDebugEnabled()) {
+                logger.debug("STEP 7 (JoinPE): " + newEvent.toString());
+            }
+        }
+    }
+
+    private void copyField(String propertyName, Schema sourceSchema,
+                           Schema targetSchema, Object source, Object target) {
+        Property sourceProperty = sourceSchema.getProperties()
+                                              .get(propertyName);
+        Property targetProperty = targetSchema.getProperties()
+                                              .get(propertyName);
+
+        if (sourceProperty == null || targetProperty == null
+                || !sourceProperty.getType().equals(targetProperty.getType())) {
+            throw new RuntimeException("Specified property " + propertyName
+                    + " doesn't exist or is not consistent");
+        }
+
+        try {
+            Object sourceValue = sourceProperty.getGetterMethod()
+                                               .invoke(source);
+            if (sourceValue == null) {
+                return;
+            }
+            if (sourceProperty.getType().isPrimitive()) {
+                if (sourceValue instanceof Number) {
+                    if (((Number) sourceValue).doubleValue() == 0.0) {
+                        return;
+                    }
+                }
+                if (sourceValue instanceof Boolean) {
+                    if (((Boolean) sourceValue).equals(Boolean.FALSE)) {
+                        return;
+                    }
+                }
+            }
+            targetProperty.getSetterMethod().invoke(target, sourceValue);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/OutputFormatter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/OutputFormatter.java b/s4-core/src/main/java/org/apache/s4/processor/OutputFormatter.java
new file mode 100644
index 0000000..f097977
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/OutputFormatter.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.processor;
+
+public interface OutputFormatter {
+    public Object format(Object outputValue);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcher.java b/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcher.java
new file mode 100644
index 0000000..4b62bf8
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcher.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.processor;
+
+public interface OverloadDispatcher {
+    public void dispatch(Object pe, Object event);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherGenerator.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherGenerator.java b/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherGenerator.java
new file mode 100644
index 0000000..9cd2994
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherGenerator.java
@@ -0,0 +1,313 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.processor;
+
+import java.io.FileOutputStream;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.bcel.Constants;
+import org.apache.bcel.classfile.JavaClass;
+import org.apache.bcel.generic.BranchInstruction;
+import org.apache.bcel.generic.ClassGen;
+import org.apache.bcel.generic.ConstantPoolGen;
+import org.apache.bcel.generic.INSTANCEOF;
+import org.apache.bcel.generic.InstructionFactory;
+import org.apache.bcel.generic.InstructionHandle;
+import org.apache.bcel.generic.InstructionList;
+import org.apache.bcel.generic.MethodGen;
+import org.apache.bcel.generic.ObjectType;
+import org.apache.bcel.generic.Type;
+
+public class OverloadDispatcherGenerator {
+    private List<Hierarchy> hierarchies = new ArrayList<Hierarchy>();
+    private Class<?> targetClass;
+    private boolean forSlot = false;
+    private ObjectType abstractWindowingPEType = new ObjectType("org.apache.s4.processor.AbstractWindowingPE");
+    private String classDumpFile;
+
+    public void setClassDumpFile(String classDumpFile) {
+        this.classDumpFile = classDumpFile;
+    }
+
+    public OverloadDispatcherGenerator(Class targetClass) {
+        this(targetClass, false);
+    }
+
+    public OverloadDispatcherGenerator(Class targetClass, boolean forSlot) {
+        this.targetClass = targetClass;
+        this.forSlot = forSlot;
+
+        for (Method method : targetClass.getMethods()) {
+            if (method.getName().equals("processEvent")
+                    && method.getReturnType().equals(Void.TYPE)) {
+                this.addHierarchy((Class<?>) method.getParameterTypes()[0]);
+            }
+        }
+        Collections.sort(hierarchies);
+    }
+
+    public void addHierarchy(Class<?> clazz) {
+        hierarchies.add(new Hierarchy(clazz));
+    }
+
+    public Class<Object> generate() {
+        Random rand = new Random(System.currentTimeMillis());
+        String dispatcherClassName = "OverloadDispatcher"
+                + (Math.abs(rand.nextInt() % 3256));
+
+        String interfaceName = "org.apache.s4.processor.OverloadDispatcher";
+        if (forSlot) {
+            interfaceName = "org.apache.s4.processor.OverloadDispatcherSlot";
+        }
+
+        ClassGen cg = new ClassGen(dispatcherClassName,
+                                   "java.lang.Object",
+                                   dispatcherClassName + ".java",
+                                   Constants.ACC_PUBLIC | Constants.ACC_SUPER,
+                                   new String[] { interfaceName });
+        ConstantPoolGen cp = cg.getConstantPool();
+        InstructionFactory instFactory = new InstructionFactory(cg, cp);
+
+        InstructionList il = new InstructionList();
+
+        // build constructor method for new class
+        MethodGen constructor = new MethodGen(Constants.ACC_PUBLIC,
+                                              Type.VOID,
+                                              Type.NO_ARGS,
+                                              new String[] {},
+                                              "<init>",
+                                              dispatcherClassName,
+                                              il,
+                                              cp);
+        il.append(InstructionFactory.createLoad(Type.OBJECT, 0));
+        il.append(instFactory.createInvoke("java.lang.Object",
+                                           "<init>",
+                                           Type.VOID,
+                                           Type.NO_ARGS,
+                                           Constants.INVOKESPECIAL));
+
+        il.append(InstructionFactory.createReturn(Type.VOID));
+        constructor.setMaxStack();
+        constructor.setMaxLocals();
+        cg.addMethod(constructor.getMethod());
+        il.dispose();
+
+        // build dispatch method
+        il = new InstructionList();
+
+        Type[] dispatchArgumentTypes = null;
+        String[] dispatchArgumentNames = null;
+        int postArgumentVariableSlot = 3;
+        if (forSlot) {
+            dispatchArgumentTypes = new Type[] { ObjectType.OBJECT,
+                    ObjectType.OBJECT, ObjectType.LONG, abstractWindowingPEType };
+            dispatchArgumentNames = new String[] { "slot", "event", "slotTime",
+                    "pe" };
+            postArgumentVariableSlot = 6;
+        } else {
+            dispatchArgumentTypes = new Type[] { ObjectType.OBJECT,
+                    ObjectType.OBJECT };
+            dispatchArgumentNames = new String[] { "pe", "event" };
+
+        }
+
+        MethodGen method = new MethodGen(Constants.ACC_PUBLIC,
+                                         Type.VOID,
+                                         dispatchArgumentTypes,
+                                         dispatchArgumentNames,
+                                         "dispatch",
+                                         dispatcherClassName,
+                                         il,
+                                         cp);
+
+        List<InstructionHandle> targetInstructions = new ArrayList<InstructionHandle>();
+        List<BranchInstruction> branchInstructions = new ArrayList<BranchInstruction>();
+        List<BranchInstruction> gotoInstructions = new ArrayList<BranchInstruction>();
+
+        ObjectType peType = new ObjectType(targetClass.getName());
+
+        il.append(InstructionFactory.createLoad(Type.OBJECT, 1));
+        il.append(instFactory.createCheckCast(peType));
+        il.append(InstructionFactory.createStore(peType,
+                                                 postArgumentVariableSlot));
+
+        for (int i = 0; i < hierarchies.size(); i++) {
+            Hierarchy hierarchy = hierarchies.get(i);
+
+            ObjectType hierarchyTop = new ObjectType(hierarchy.getTop()
+                                                              .getName());
+
+            InstructionHandle ih = il.append(InstructionFactory.createLoad(Type.OBJECT,
+                                                                           2));
+            if (i > 0) {
+                targetInstructions.add(ih);
+            }
+
+            il.append(new INSTANCEOF(cp.addClass(hierarchyTop)));
+            BranchInstruction bi = InstructionFactory.createBranchInstruction(Constants.IFEQ,
+                                                                              null);
+            il.append(bi);
+            branchInstructions.add(bi);
+
+            il.append(InstructionFactory.createLoad(peType,
+                                                    postArgumentVariableSlot));
+            il.append(InstructionFactory.createLoad(hierarchyTop, 2));
+            il.append(instFactory.createCheckCast(hierarchyTop));
+            if (forSlot) {
+                il.append(InstructionFactory.createLoad(ObjectType.LONG, 3));
+                il.append(InstructionFactory.createLoad(abstractWindowingPEType,
+                                                        5));
+            }
+
+            Type[] argumentTypes = null;
+            if (forSlot) {
+                argumentTypes = new Type[] { hierarchyTop, ObjectType.LONG,
+                        abstractWindowingPEType };
+            } else {
+                argumentTypes = new Type[] { hierarchyTop };
+            }
+            il.append(instFactory.createInvoke(targetClass.getName(),
+                                               "processEvent",
+                                               Type.VOID,
+                                               argumentTypes,
+                                               Constants.INVOKEVIRTUAL));
+
+            // no branch needed for last check
+            if (i < (hierarchies.size() - 1)) {
+                bi = InstructionFactory.createBranchInstruction(Constants.GOTO,
+                                                                null);
+                il.append(bi);
+                gotoInstructions.add(bi);
+            }
+        }
+
+        InstructionHandle returnInstruction = il.append(InstructionFactory.createReturn(Type.VOID));
+
+        for (int i = 0; i < targetInstructions.size(); i++) {
+            branchInstructions.get(i).setTarget(targetInstructions.get(i));
+        }
+
+        branchInstructions.get(branchInstructions.size() - 1)
+                          .setTarget(returnInstruction);
+
+        for (BranchInstruction gotoInstruction : gotoInstructions) {
+            gotoInstruction.setTarget(returnInstruction);
+        }
+
+        method.setMaxStack();
+        method.setMaxLocals();
+        cg.addMethod(method.getMethod());
+        il.dispose();
+
+        JavaClass jc = cg.getJavaClass();
+        OverloadDispatcherClassLoader cl = new OverloadDispatcherClassLoader();
+
+        // debug
+        if (classDumpFile != null) {
+            FileOutputStream fos = null;
+            try {
+                fos = new FileOutputStream(classDumpFile);
+                fos.write(jc.getBytes());
+            } catch (Exception e) {
+                e.printStackTrace();
+            } finally {
+                if (fos != null)
+                    try {
+                        fos.close();
+                    } catch (Exception e) {
+                    }
+            }
+        }
+
+        return cl.loadClassFromBytes(dispatcherClassName, jc.getBytes());
+
+    }
+
+    public static class Hierarchy implements Comparable<Hierarchy> {
+        private List<Class<?>> classes = new ArrayList<Class<?>>();
+
+        public Hierarchy(Class<?> clazz) {
+            for (Class<?> currentClass = clazz; currentClass != null; currentClass = (Class) currentClass.getSuperclass()) {
+                classes.add(currentClass);
+            }
+        }
+
+        public Class<?> getTop() {
+            if (classes.size() < 1) {
+                return null;
+            }
+            return classes.get(0);
+        }
+
+        public List<Class<?>> getClasses() {
+            return classes;
+        }
+
+        public boolean equals(Hierarchy other) {
+            if (classes.size() != other.classes.size()) {
+                return false;
+            }
+
+            for (int i = 0; i < classes.size(); i++) {
+                if (!classes.get(i).equals(other.classes.get(i))) {
+                    return false;
+                }
+            }
+
+            return true;
+        }
+
+        public int compareTo(Hierarchy other) {
+            if (this.equals(other)) {
+                return 0;
+            } else if (this.containsClass(other.getTop())) {
+                return -1;
+            }
+
+            return 1;
+        }
+
+        private boolean containsClass(Class<?> other) {
+            for (Class<?> clazz : classes) {
+                if (clazz.equals(other)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+
+    public static class OverloadDispatcherClassLoader extends URLClassLoader {
+        public OverloadDispatcherClassLoader() {
+            super(new URL[] {});
+        }
+
+        public Class loadClassFromBytes(String name, byte[] bytes) {
+            try {
+                return this.loadClass(name);
+            } catch (ClassNotFoundException cnfe) {
+                // expected
+            }
+            return this.defineClass(name, bytes, 0, bytes.length);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherSlot.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherSlot.java b/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherSlot.java
new file mode 100644
index 0000000..b443aa5
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherSlot.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.processor;
+
+public interface OverloadDispatcherSlot {
+    public void dispatch(Object slot, Object event, long slotTime, AbstractWindowingPE pe);
+}