You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:16 UTC
[38/50] [abbrv] Rename packages in preparation for move to Apache
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java b/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java
new file mode 100644
index 0000000..3dfea86
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.persist;
+
+import org.apache.s4.util.clock.Clock;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+public class HashMapPersister implements Persister {
+ private volatile int persistCount = 0;
+ private boolean selfClean = false;
+ private int cleanWaitTime = 40; // 40 seconds by default
+ private String loggerName = "s4";
+ Map<String, CacheEntry> cache;
+ Clock s4Clock;
+
+ private int startCapacity = 5000;
+
+ public void setStartCapacity(int startCapacity) {
+ this.startCapacity = startCapacity;
+ }
+
+ public int getStartCapacity() {
+ return startCapacity;
+ }
+
+ public void setSelfClean(boolean selfClean) {
+ this.selfClean = selfClean;
+ }
+
+ public void setCleanWaitTime(int cleanWaitTime) {
+ this.cleanWaitTime = cleanWaitTime;
+ }
+
+ public void setLoggerName(String loggerName) {
+ this.loggerName = loggerName;
+ }
+
+ public HashMapPersister(Clock s4Clock) {
+ this.s4Clock = s4Clock;
+ }
+
+ public void setS4Clock(Clock s4Clock) {
+ this.s4Clock = s4Clock;
+ }
+
+ public void init() {
+ cache = Collections.synchronizedMap(new HashMap<String, CacheEntry>(this.getStartCapacity()));
+
+ if (selfClean) {
+ Runnable r = new Runnable() {
+ public void run() {
+ while (!Thread.interrupted()) {
+ int cleanCount = HashMapPersister.this.cleanOutGarbage();
+ Logger.getLogger(loggerName).info("Cleaned out "
+ + cleanCount + " entries; Persister has "
+ + cache.size() + " entries");
+ try {
+ Thread.sleep(cleanWaitTime * 1000);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ };
+ Thread t = new Thread(r);
+ t.start();
+ t.setPriority(Thread.MIN_PRIORITY);
+ }
+ }
+
+ public int getQueueSize() {
+ return 0;
+ }
+
+ public int getPersistCount() {
+ return persistCount;
+ }
+
+ public int getCacheEntryCount() {
+ return cache.size();
+ }
+
+ public void setAsynch(String key, Object value, int period) {
+ // there really is no asynch for the local cache
+ set(key, value, period);
+ }
+
+ public void set(String key, Object value, int period) {
+ if (value == null) {
+ cache.remove(key);
+ return;
+ }
+
+ synchronized (this) {
+ persistCount++;
+ }
+
+ CacheEntry ce = new CacheEntry();
+ ce.value = value;
+ ce.period = period;
+ ce.addTime = s4Clock.getCurrentTime();
+ cache.put(key, ce);
+ }
+
+ public Object get(String key) {
+ CacheEntry ce = cache.get(key);
+ if (ce == null) {
+ return null;
+ }
+
+ if (ce.isExpired()) {
+ return null;
+ }
+
+ return ce.value;
+ }
+
+ public Map<String, Object> getBulk(String[] keys) {
+ HashMap map = new HashMap<String, Object>();
+ for (String key : keys) {
+ Object value = get(key);
+ if (value != null) {
+ map.put(key, value);
+ }
+ }
+ return map;
+ }
+
+ public Object getObject(String key) {
+ return get(key);
+ }
+
+ public Map<String, Object> getBulkObjects(String[] keys) {
+ return getBulk(keys);
+ }
+
+ public void remove(String key) {
+ cache.remove(key);
+ }
+
+ public int cleanOutGarbage() {
+ int count = 0;
+ List<String> keyList;
+ synchronized (cache) {
+ keyList = new ArrayList<String>(cache.size());
+ for (String key : cache.keySet()) {
+ keyList.add(key);
+ }
+ }
+
+ for (String key : keyList) {
+ CacheEntry ce = cache.get(key);
+ if (ce != null && ce.isExpired()) {
+ count++;
+ cache.remove(key);
+ }
+ }
+ return count;
+ }
+
+ public Set<String> keySet() {
+ Set<String> keys = new HashSet<String>();
+ synchronized (cache) {
+ for (String key : cache.keySet()) {
+ keys.add(key);
+ }
+ }
+ return keys;
+ }
+
+ public class CacheEntry {
+ Object value;
+ long addTime;
+ int period;
+
+ public boolean isExpired() {
+ if (period > 0) {
+ if ((addTime + (1000 * (long) period)) <= s4Clock.getCurrentTime()) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/persist/Persister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/persist/Persister.java b/s4-core/src/main/java/org/apache/s4/persist/Persister.java
new file mode 100644
index 0000000..114043c
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/persist/Persister.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.persist;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Defines an interface to a collection of key/value pairs, each of which has a
+ * time-to-live property.
+ **/
+public interface Persister {
+ /**
+ * Returns the number of <code>setAsynch</code>-initiated operations that
+ * are pending or in-progress.
+ *
+ * @return the number of pending set operations
+ **/
+ int getQueueSize();
+
+ /**
+ * Returns the number of <code>set</code> or <code>setAsynch</code> calls
+ * made to this object.
+ *
+ * @return the number of <code>set</code> or <code>setAsynch</code> calls
+ * made to this object.
+ **/
+ int getPersistCount();
+
+ /**
+ * Returns the number of key/value pairs in this Persister object.
+ * <p>
+ * If the underlying technology (e.g., Memcached) does not support this
+ * operation, then the implementation may return 0 or a
+ * <code>java.lang.UnsupportedOperationException</code>.
+ * <p>
+ * This number may include expired entries, depending on the implementation.
+ * Some implementations clean up expired entries only when the value is
+ * requested or at certain time intervals (or not at all).
+ *
+ * @return the number of entries in this Persister
+ **/
+ int getCacheEntryCount();
+
+ /**
+ * Set a value for a key without waiting for the operation to complete.
+ * <p>
+ * This method is useful in the case where the underlying implementation
+ * uses a network but the caller should not be delayed by the network
+ * operation.
+ * <p>
+ * Because the set operation is not neccessarily complete when the method
+ * returns, a subsequent <code>get</code> call may return stale data.
+ * <p>
+ * <code>period</code> specifies the time-to-live for this key/value pair.
+ * For calls to <code>get</code> or <code>getObject</code> after the
+ * specified period, the Persister object will return null.
+ *
+ * @param key
+ * the key
+ * @param value
+ * the value
+ * @param period
+ * the maximum interval of time during which this key/value pair
+ * is valid. Also know as time-to-live. -1 indicates an infinite
+ * time-to-live.
+ **/
+ void setAsynch(String key, Object value, int period);
+
+ /**
+ * Set a value for a key.
+ * <p>
+ * <code>period</code> specifies the time-to-live for this key/value pair.
+ * For calls to <code>get</code> or <code>getObject</code> after the
+ * specified period, the Persister object will return null.
+ *
+ * @param key
+ * the key
+ * @param value
+ * the value
+ * @param period
+ * the maximum interval of time during which this key/value pair
+ * is valid. Also know as time-to-live. -1 indicates an infinite
+ * time-to-live.
+ **/
+ void set(String key, Object value, int period) throws InterruptedException;
+
+ /**
+ * Get the value associated with a specified key.
+ *
+ * If the period (aka time-to-live) for this key/value pair is expired, the
+ * <code>get</code> will return null.
+ *
+ * @param key
+ * the key
+ **/
+ Object get(String key) throws InterruptedException;
+
+ /**
+ * Get the values associated with a list of keys.
+ * <p>
+ * <code>getBulk</code> returns a <code>Map</code> containing an entry for
+ * each key/value pair. The map contains an entry for a specified key only
+ * if that key exists in the Persister and the associated key/value pair is
+ * not yet expired.
+ *
+ * @param keys
+ * a list of keys
+ **/
+ Map<String, Object> getBulk(String[] keys) throws InterruptedException;
+
+ /**
+ * This is a method to help support some implementations whose underlying
+ * technology encodes the value associated with a key. In some cases, the
+ * value may have been stored by a client that uses a form of encoding not
+ * supported by the Persister's implementation. In that case, one might want
+ * the raw, yet-to-be-decoded value associated with the key.
+ * <p>
+ * <code>getObject</code> retrieves the raw, unencoded value associated with
+ * a key. In all other respects, it's the same as {@link Persister#get}.
+ * <p>
+ * It's likely you will never need to call this method.
+ *
+ * If the period (aka time-to-live) for this key/value pair is expired, the
+ * <code>get</code> will return null.
+ **/
+ Object getObject(String key) throws InterruptedException;
+
+ /**
+ * As with {@link Persister#getObject}, this is a method to help support
+ * some implementations whose underlying technology encodes result.
+ * <p>
+ * It's likely you will never need to call this method.
+ * <p>
+ * In all other respects, it is the same as {@link Persister#getBulk}.
+ **/
+ Map<String, Object> getBulkObjects(String[] keys)
+ throws InterruptedException;
+
+ /**
+ * Removes the entry for the specified key from the Persister object.
+ *
+ * @param key
+ * the key of the entry to be removed
+ **/
+ void remove(String key) throws InterruptedException;
+
+ /**
+ * Forces the initiation of the process that cleans up expired entries.
+ * <p>
+ * Normally, you would not call this method. The implementation takes care
+ * of the timing of the cleanup process.
+ * <p>
+ * In some implementations, this method may be a no-op.
+ *
+ **/
+ int cleanOutGarbage() throws InterruptedException;
+
+ /**
+ * Returns a <code>Set</code> of the keys contained in this Persister
+ * object.
+ * <p>
+ * The <code>Set</code> may contain keys for expired entries, depending on
+ * how often the underlying implementation cleans up expired entries.
+ * <p>
+ * The underlying technology may not support this operation. As a result,
+ * some implementations return an empty set.
+ *
+ * @return a <code>Set</code> of the keys contained in this Persister
+ * object.
+ **/
+ Set<String> keySet();
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java b/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
new file mode 100644
index 0000000..5852ec8
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
@@ -0,0 +1,778 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.processor;
+
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.dispatcher.partitioner.KeyInfo;
+import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElement;
+import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementIndex;
+import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementName;
+import org.apache.s4.ft.InitiateCheckpointingEvent;
+import org.apache.s4.ft.RecoveryEvent;
+import org.apache.s4.ft.SafeKeeper;
+import org.apache.s4.ft.SafeKeeperId;
+import org.apache.s4.persist.Persister;
+import org.apache.s4.schema.Schema;
+import org.apache.s4.schema.Schema.Property;
+import org.apache.s4.schema.SchemaContainer;
+import org.apache.s4.util.clock.Clock;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+
+/**
+ * This is the base class for processor classes.
+ * <p>
+ * <code>AbstractProcessor</code> provides output frequency strategies that
+ * allow you to configure the rate at which your processor produces output (see
+ * {@link AbstractPE#setOutputFrequencyByEventCount} and
+ * {@link AbstractPE#setOutputFrequencyByTimeBoundary}.
+ */
+public abstract class AbstractPE implements Cloneable {
+ public static enum FrequencyType {
+ TIMEBOUNDARY("timeboundary"), EVENTCOUNT("eventcount");
+
+ private String name;
+
+ FrequencyType(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+ }
+
+ public static enum PeriodicInvokerType {
+ OUTPUT, CHECKPOINTING;
+
+ public String getName() {
+ if (OUTPUT == this) {
+ return "PeriodicOutputInvoker";
+ } else {
+ return "PeriodicCheckpointingInvoker";
+ }
+ }
+ }
+
+ transient private Clock clock;
+ // FIXME replaces monitor wait on AbstractPE, for triggering possible extra
+ // thread when checkpointing activated
+ transient private CountDownLatch s4ClockSetSignal = new CountDownLatch(1);
+ transient private int outputFrequency = 1;
+ transient private FrequencyType outputFrequencyType = FrequencyType.EVENTCOUNT;
+ transient private int outputFrequencyOffset = 0;
+ transient private int eventCount = 0;
+ transient private int ttl = -1;
+ transient private Persister lookupTable;
+ transient private List<EventAdvice> eventAdviceList = new ArrayList<EventAdvice>();
+ transient private List<Object> keyValue;
+ transient private List<Object> keyRecord;
+ private String keyValueString;
+ transient private String streamName;
+ transient private boolean saveKeyRecord = false;
+ transient private int outputsBeforePause = -1;
+ transient private long pauseTimeInMillis;
+ transient private boolean logPauses = false;
+ private String id;
+ transient protected SchemaContainer schemaContainer = new SchemaContainer();
+ transient private PrototypeWrapper prototypeWrapper;
+
+ transient private boolean recoveryAttempted = false;
+ // true if state may have changed
+ transient private boolean checkpointable = false;
+ // use a flag for identifying checkpointing events
+ transient private boolean isCheckpointingEvent = false;
+
+ transient private SafeKeeper safeKeeper; // handles fault tolerance
+ transient private CountDownLatch safeKeeperSetSignal = new CountDownLatch(1);
+ transient private int checkpointingFrequency = 0;
+ transient private FrequencyType checkpointingFrequencyType = FrequencyType.EVENTCOUNT;
+ transient private int checkpointingFrequencyOffset = 0;
+ transient private int checkpointableEventCount = 0;
+ transient private int checkpointsBeforePause = -1;
+ transient private long checkpointingPauseTimeInMillis;
+
+ transient private OverloadDispatcher overloadDispatcher;
+
+ public void setSaveKeyRecord(boolean saveKeyRecord) {
+ this.saveKeyRecord = saveKeyRecord;
+ }
+
+ public void setOutputsBeforePause(int outputsBeforePause) {
+ this.outputsBeforePause = outputsBeforePause;
+ }
+
+ public void setCheckpointsBeforePause(int checkpointsBeforePause) {
+ this.checkpointsBeforePause = checkpointsBeforePause;
+ }
+
+ public void setPauseTimeInMillis(long pauseTimeInMillis) {
+ this.pauseTimeInMillis = pauseTimeInMillis;
+ }
+
+ public void setCheckpointingPauseTimeInMillis(long checkpointingPauseTimeInMillis) {
+ this.checkpointingPauseTimeInMillis = checkpointingPauseTimeInMillis;
+ }
+
+ public void setLogPauses(boolean logPauses) {
+ this.logPauses = logPauses;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public void setClock(Clock clock) {
+ this.clock = clock;
+ if (this.clock != null) {
+ this.s4ClockSetSignal.countDown();
+ }
+ }
+
+ /**
+ * This method will be called after the object is cloned from the prototype
+ * PE. The concrete PE class should override this if it has any special
+ * set-up requirements.
+ */
+ public void initInstance() {
+ // default implementation does nothing.
+ }
+
+ public Clock getClock() {
+ return clock;
+ }
+
+ public void setPrototypeWrapper(PrototypeWrapper prototypeWrapper) {
+ this.prototypeWrapper = prototypeWrapper;
+ }
+
+ public AbstractPE() {
+ OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(this.getClass());
+ Class<?> overloadDispatcherClass = oldg.generate();
+
+ try {
+ overloadDispatcher = (OverloadDispatcher) overloadDispatcherClass.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * You should not override this method. Instead, you need to implement the
+ * <code>processEvent</code> method.
+ **/
+ public void execute(String streamName, CompoundKeyInfo compoundKeyInfo, Object event) {
+ // if this is the first time through, get the key for this PE
+ if (keyValue == null || saveKeyRecord) {
+ setKeyValue(event, compoundKeyInfo);
+
+ if (compoundKeyInfo != null)
+ keyValueString = compoundKeyInfo.getCompoundValue();
+ }
+
+ this.streamName = streamName;
+
+ if (safeKeeper != null) {
+ // initialize checkpointing event flag
+ this.isCheckpointingEvent = false;
+ if (!recoveryAttempted) {
+ recover();
+ recoveryAttempted = true;
+ }
+ }
+
+ overloadDispatcher.dispatch(this, event);
+
+ if (saveKeyRecord) {
+ keyRecord.clear(); // the PE doesn't need it anymore
+ }
+
+ if (outputFrequencyType == FrequencyType.EVENTCOUNT && outputFrequency > 0 && !isCheckpointingEvent) {
+ eventCount++;
+ if (eventCount % outputFrequency == 0) {
+ try {
+ output();
+ } catch (Exception e) {
+ Logger.getLogger("s4").error("Exception calling output() method in execute()", e);
+ }
+ }
+ }
+
+ // do not take into account checkpointing/recovery trigger messages
+ if (!isCheckpointingEvent) {
+ checkpointable = true; // dirty flag
+ if (checkpointingFrequencyType == FrequencyType.EVENTCOUNT && checkpointingFrequency > 0) {
+ checkpointableEventCount++;
+ if (checkpointableEventCount % checkpointingFrequency == 0) {
+ // for count-based frequency, we directly checkpoint here
+ checkpoint();
+ }
+ }
+
+ }
+ }
+
+ public long getCurrentTime() {
+ return clock.getCurrentTime();
+ }
+
+ /**
+ * This method returns the key value associated with this PE.
+ * <p>
+ * The key value is a list because the key may be a compound (composite)
+ * key, in which case the key will have one value for each simple key.
+ *
+ * @return the key value as a List of Objects (each element contains the
+ * value of a simple key).
+ **/
+ public List<Object> getKeyValue() {
+ return keyValue;
+ }
+
+ public List<Object> getKeyRecord() {
+ return keyRecord;
+ }
+
+ public String getKeyValueString() {
+ return keyValueString;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ private void setKeyValue(Object event, CompoundKeyInfo compoundKeyInfo) {
+ if (compoundKeyInfo == null) {
+ return;
+ }
+
+ keyValue = new ArrayList<Object>();
+
+ Schema schema = schemaContainer.getSchema(event.getClass());
+
+ // get the value for each keyInfo
+ for (KeyInfo keyInfo : compoundKeyInfo.getKeyInfoList()) {
+ Object value = null;
+ Object record = event;
+ List<?> list = null;
+ Property property = null;
+ for (KeyPathElement keyPathElement : keyInfo.getKeyPath()) {
+ if (keyPathElement instanceof KeyPathElementIndex) {
+ record = list.get(((KeyPathElementIndex) keyPathElement).getIndex());
+ schema = property.getComponentProperty().getSchema();
+ } else {
+ String keyPathElementName = ((KeyPathElementName) keyPathElement).getKeyName();
+ property = schema.getProperties().get(keyPathElementName);
+ value = null;
+ try {
+ value = property.getGetterMethod().invoke(record);
+ } catch (Exception e) {
+ Logger.getLogger("s4").error(e);
+ return;
+ }
+
+ if (value == null) {
+ Logger.getLogger("s4").error("Value for " + keyPathElementName + " is null!");
+ return;
+ }
+
+ if (property.getType().isPrimitive() || property.isNumber()
+ || property.getType().equals(String.class)) {
+ keyValue.add(value);
+ if (saveKeyRecord) {
+ if (keyRecord == null) {
+ keyRecord = new ArrayList<Object>();
+ }
+ keyRecord.add(record);
+ }
+ continue;
+ } else if (property.isList()) {
+ try {
+ list = (List) property.getGetterMethod().invoke(record);
+ } catch (Exception e) {
+ Logger.getLogger("s4").error(e);
+ return;
+ }
+ } else {
+ try {
+ record = property.getGetterMethod().invoke(record);
+ } catch (Exception e) {
+ Logger.getLogger("s4").error(e);
+ return;
+ }
+ schema = property.getSchema();
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * This method sets the output strategy to "by event count" and specifies
+ * how many events trigger a call to the <code>output</code> method.
+ * <p>
+ * You would not normally call this method directly, but instead via the S4
+ * configuration file.
+ * <p>
+ * After this method is called, AbstractProcessor will call your
+ * <code>output</code> method (implemented in your subclass) every
+ * <emp>outputFrequency</emph> events.
+ * <p>
+ * If you call neither <code>setOutputFrequencyByEventCount</code> nor
+ * <code>setOutputFrequencyByTimeBoundary</code>, the default strategy is
+ * "by event count" with an output frequency of 1. (That is,
+ * <code>output</code> is called after after each return from
+ * <code>processEvent</code>).
+ *
+ * @param outputFrequency
+ * the number of application events passed to
+ * <code>processEvent</code> before output is called.
+ **/
+ public void setOutputFrequencyByEventCount(int outputFrequency) {
+ this.outputFrequency = outputFrequency;
+ this.outputFrequencyType = FrequencyType.EVENTCOUNT;
+ initFrequency(PeriodicInvokerType.OUTPUT);
+ }
+
+ /**
+ * Sets the frequency strategy to "by event count". Uses the same mechanism
+ * than {@link #setOutputFrequencyByEventCount(int)}
+ *
+ * @param checkpointingFrequency
+ * the number of application events passed to
+ * <code>processEvent</code> before output is called (ignoring
+ * checkpointing events).
+ */
+ public void setCheckpointingFrequencyByEventCount(int checkpointingFrequency) {
+ this.checkpointingFrequency = checkpointingFrequency;
+ this.checkpointingFrequencyType = FrequencyType.EVENTCOUNT;
+ supplementAdviceForCheckpointingAndRecovery();
+ }
+
+ /**
+ * This method sets the output strategy to "output on time boundary" and
+ * specifies the time boundary on which the <code>output</code> should be
+ * called.
+ * <p>
+ * You would not normally call this method directly, but instead via the S4
+ * configuration file.
+ * <p>
+ * <code>outputFrequency</code> specifies the time boundary in seconds.
+ * Whenever the current time is a multiple of <code>outputFrequency</code>,
+ * <code>AbstractProcessor</code> will call your <code>output</code> method.
+ * For example, if you specify an <code>outputFrequency</code> of 3600,
+ * <code>AbstractProcessor</code> will call <code>output</code> on every
+ * hour boundary (e.g., 11:00:00, 12:00:00, 13:00:00, etc.).
+ * <p>
+ * When this output strategy is used, your <code>output</code> method may
+ * occasionally (or frequently) run concurrently with your
+ * <code>processEvent</code> method. Therefore, you should take steps to
+ * protect any data structures that both methods use.
+ * <p>
+ * If you call neither <code>setOutputFrequencyByEventCount</code> nor
+ * <code>setOutputFrequencyByTimeBoundary</code>, the default strategy is
+ * "by event count" with an output frequency of 1. (That is,
+ * <code>output</code> is called after after each return from
+ * <code>processEvent</code>).
+ *
+ * @param outputFrequency
+ * the time boundary in seconds
+ **/
+ public void setOutputFrequencyByTimeBoundary(int outputFrequency) {
+ this.outputFrequency = outputFrequency;
+ this.outputFrequencyType = FrequencyType.TIMEBOUNDARY;
+ initFrequency(PeriodicInvokerType.OUTPUT);
+ }
+
+ /**
+ * Sets the frequency of checkpointing. It uses the same mechanism than
+ * {@link #setOutputFrequencyByTimeBoundary(int)}
+ *
+ * @param checkpointingFrequency
+ * the time boundary in seconds
+ */
+ public void setCheckpointingFrequencyByTimeBoundary(int checkpointingFrequency) {
+ this.checkpointingFrequency = checkpointingFrequency;
+ this.checkpointingFrequencyType = FrequencyType.TIMEBOUNDARY;
+ supplementAdviceForCheckpointingAndRecovery();
+ initFrequency(PeriodicInvokerType.CHECKPOINTING);
+ }
+
+ /**
+ * Set the offset from the time boundary at which
+ * <code>AbstractProcessor</code> should call <code>output</code>.
+ * <p>
+ * This value is honored only if the "output on time boundary" output
+ * strategy is used.
+ * <p>
+ * As an example, if you specify an <code>outputFrequency</code> of 3600 and
+ * an <code>outputFrequencyOffset</code> of 7,
+ * <code>AbstractProcessor</code> will call <code>output</code> on every
+ * hour boundary plus 7 seconds (e.g., 11:00:07, 12:00:07, 13:00:07, etc.).
+ **/
+ public void setOutputFrequencyOffset(int outputFrequencyOffset) {
+ this.outputFrequencyOffset = outputFrequencyOffset;
+ }
+
+ /**
+ * Set the offset from the time boundary at which calls to checkpoint should
+ * be performed. It uses the same mechanism than
+ * {@link AbstractPE#setOutputFrequencyOffset(int)}
+ *
+ * @param checkpointingFrequencyOffset
+ * checkpointing frequency offset in seconds
+ */
+ public void setCheckpointingFrequencyOffset(int checkpointingFrequencyOffset) {
+ this.checkpointingFrequencyOffset = checkpointingFrequencyOffset;
+ supplementAdviceForCheckpointingAndRecovery();
+ }
+
+ public void setKeys(String[] keys) {
+ for (String key : keys) {
+ StringTokenizer st = new StringTokenizer(key);
+ eventAdviceList.add(new EventAdvice(st.nextToken(), st.nextToken()));
+ }
+ supplementAdviceForCheckpointingAndRecovery();
+ }
+
+ private void initFrequency(PeriodicInvokerType type) {
+ Runnable r = null;
+ if (PeriodicInvokerType.OUTPUT.equals(type)) {
+ if (outputFrequency < 0) {
+ return;
+ }
+
+ if (outputFrequencyType == FrequencyType.TIMEBOUNDARY) {
+ // create a thread that calls output on time boundaries
+ // that are multiples of frequency
+ r = new PeriodicInvoker(type);
+
+ }
+ } else {
+ if (checkpointingFrequency < 0) {
+ return;
+ }
+ if (checkpointingFrequencyType == FrequencyType.TIMEBOUNDARY) {
+ r = new PeriodicInvoker(type);
+ }
+ }
+ if (r != null) {
+ Thread t = new Thread(r, type.getName());
+ t.start();
+ }
+ }
+
+ /**
+ * This method exists simply to make <code>clone()</code> public.
+ */
+ public Object clone() {
+ try {
+ Object clone = super.clone();
+ return clone;
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void setTtl(int ttl) {
+ this.ttl = ttl;
+ }
+
+ /**
+ *
+ */
+ public int getTtl() {
+ return ttl;
+ }
+
+ public List<EventAdvice> advise() {
+ return eventAdviceList;
+ }
+
+ /**
+ *
+ */
+ public void setLookupTable(Persister lookupTable) {
+ this.lookupTable = lookupTable;
+ }
+
+ /**
+ * You implement this abstract method in your subclass. This is the part of
+ * your processor that outputs data (e.g., by writing the data to the
+ * cache). The <code>output</code> method may further process the data
+ * (e.g., aggregate it) before outputting it.
+ **/
+ abstract public void output();
+
+ protected void checkpoint() {
+
+ byte[] serializedState = serializeState();
+ // NOTE: assumes pe id is keyvalue from the PE...
+ saveState(getSafeKeeperId(), serializedState);
+ // remove dirty flag
+ checkpointable = false;
+ }
+
+ private void saveState(SafeKeeperId key, byte[] serializedState) {
+ safeKeeper.saveState(key, serializedState);
+ }
+
+ protected void recover() {
+ byte[] serializedState = null;
+ try {
+ serializedState = safeKeeper.fetchSerializedState(getSafeKeeperId());
+ } catch (RuntimeException e) {
+ Logger.getLogger("s4-ft").error("Cannot fetch serialized stated for key [" + getSafeKeeperId().toString()+"]: "+e.getMessage(), e);
+ }
+ if (serializedState == null) {
+ return;
+ }
+ try {
+ AbstractPE peInOldState = deserializeState(serializedState);
+ restoreState(peInOldState);
+ } catch (RuntimeException e) {
+ Logger.getLogger("s4-ft").error("Cannot restore state for key [" + getSafeKeeperId().toString()+"]: "+e.getMessage(), e);
+ }
+ }
+
+ public SafeKeeperId getSafeKeeperId() {
+ return new SafeKeeperId(getId(), getKeyValueString());
+ }
+
+ public void setSafeKeeper(SafeKeeper safeKeeper) {
+ this.safeKeeper = safeKeeper;
+ if (safeKeeper != null) {
+ this.safeKeeperSetSignal.countDown();
+ }
+ }
+
+ public final void processEvent(InitiateCheckpointingEvent checkpointingEvent) {
+ isCheckpointingEvent = true;
+ if (isCheckpointable()) {
+ checkpoint();
+ }
+ }
+
+ protected boolean isCheckpointable() {
+ return checkpointable;
+ }
+
+ protected void setCheckpointable(boolean checkpointable) {
+ this.checkpointable = checkpointable;
+ }
+
+ public final void initiateCheckpoint() {
+ // enqueue checkpointing event
+ if (safeKeeper != null) {
+ safeKeeper.generateCheckpoint(this);
+ }
+ }
+
+ public byte[] serializeState() {
+ return safeKeeper.getSerializer().serialize(this);
+ }
+
+ public AbstractPE deserializeState(byte[] loadedState) {
+ return (AbstractPE) safeKeeper.getSerializer().deserialize(loadedState);
+ }
+
+ public void restoreState(AbstractPE oldState) {
+ restoreFieldsForClass(oldState.getClass(), oldState);
+ }
+
+ private void restoreFieldsForClass(Class currentInOldStateClassHierarchy, AbstractPE oldState) {
+ if (!AbstractPE.class.isAssignableFrom(currentInOldStateClassHierarchy)) {
+ return;
+ } else {
+ Field[] fields = oldState.getClass().getDeclaredFields();
+ for (Field field : fields) {
+ if (!Modifier.isTransient(field.getModifiers()) && !Modifier.isStatic(field.getModifiers())) {
+ if (!Modifier.isPublic(field.getModifiers())) {
+ field.setAccessible(true);
+ }
+ try {
+ // TODO use reflectasm
+ field.set(this, field.get(oldState));
+ } catch (IllegalArgumentException e) {
+ Logger.getLogger("s4-ft").error("Cannot recover old state for this PE [" + this + "]", e);
+ return;
+ } catch (IllegalAccessException e) {
+ Logger.getLogger("s4-ft").error("Cannot recover old state for this PE [" + this + "]", e);
+ return;
+ }
+
+ }
+ }
+ restoreFieldsForClass(currentInOldStateClassHierarchy.getSuperclass(), oldState);
+ }
+ }
+
+ /**
+ * Subscribes this PE to the checkpointing stream
+ */
+ private void supplementAdviceForCheckpointingAndRecovery() {
+ // don't do anything until both conditions are true
+ Logger.getLogger("s4").info(
+ "Maybe adding for " + this.getId() + ": " + checkpointingFrequency + " and " + eventAdviceList.size());
+ if (checkpointingFrequency > 0 && eventAdviceList.size() > 0) {
+ eventAdviceList.add(new EventAdvice(this.getId() + "_checkpointing", "key"));
+ }
+ }
+
+ public void processEvent(RecoveryEvent recoveryEvent) {
+ isCheckpointingEvent = true;
+ recover();
+ }
+
+ /**
+ * This method expires the current PE.
+ **/
+ protected void expire() {
+ this.prototypeWrapper.expire(this.keyValueString);
+ }
+
+ class PeriodicInvoker implements Runnable {
+
+ PeriodicInvokerType type;
+
+ public PeriodicInvoker(PeriodicInvokerType type) {
+ this.type = type;
+ }
+
+ public long getFrequencyInMillis() {
+ if (type.equals(PeriodicInvokerType.OUTPUT)) {
+ return outputFrequency * 1000;
+ } else {
+ return checkpointingFrequency * 1000;
+ }
+ }
+
+ public long getFrequencyOffset() {
+ if (type.equals(PeriodicInvokerType.OUTPUT)) {
+ return outputFrequencyOffset;
+ } else {
+ return checkpointingFrequencyOffset;
+ }
+ }
+
+ public void run() {
+ if (clock == null) {
+ try {
+ s4ClockSetSignal.await();
+ } catch (InterruptedException e) {
+ }
+ }
+ if (PeriodicInvokerType.CHECKPOINTING.equals(type) && safeKeeper == null) {
+ try {
+ safeKeeperSetSignal.await();
+ } catch (InterruptedException e) {
+ }
+ }
+
+ int outputCount = 0;
+ int checkpointCount = 0;
+ long frequencyInMillis = getFrequencyInMillis();
+
+ long currentTime = getCurrentTime();
+ while (!Thread.interrupted()) {
+ long currentBoundary = (currentTime / frequencyInMillis) * frequencyInMillis;
+ long nextBoundary = currentBoundary + frequencyInMillis;
+ currentTime = clock.waitForTime(nextBoundary + (outputFrequencyOffset * 1000));
+ if (lookupTable != null) {
+ Set peKeys = lookupTable.keySet();
+ for (Iterator it = peKeys.iterator(); it.hasNext();) {
+ String peKey = (String) it.next();
+ AbstractPE pe = null;
+ try {
+ pe = (AbstractPE) lookupTable.get(peKey);
+ } catch (InterruptedException ie) {
+ }
+
+ if (pe == null) {
+ continue;
+ }
+
+ if (PeriodicInvokerType.OUTPUT.equals(type)) {
+ try {
+ pe.output();
+ outputCount++;
+ } catch (Exception e) {
+ Logger.getLogger("s4").error("Exception calling output() method", e);
+ }
+
+ if (outputCount == outputsBeforePause) {
+ if (logPauses) {
+ Logger.getLogger("s4").info(
+ "Pausing " + getId() + " at count " + outputCount + " for "
+ + pauseTimeInMillis + " milliseconds");
+ }
+ outputCount = 0;
+ try {
+ Thread.sleep(pauseTimeInMillis);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } else if (PeriodicInvokerType.CHECKPOINTING.equals(type)) {
+ try {
+ if (pe.isCheckpointable()) {
+ pe.initiateCheckpoint();
+ checkpointCount++;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ Logger.getLogger("s4").error("Exception calling checkpoint() method", e);
+ }
+
+ if (checkpointCount == checkpointsBeforePause) {
+ if (logPauses) {
+ Logger.getLogger("s4").info(
+ "Pausing " + getId() + " at checkpoint count " + checkpointCount + " for "
+ + checkpointingPauseTimeInMillis + " milliseconds");
+ }
+ checkpointCount = 0;
+ try {
+ Thread.sleep(checkpointingPauseTimeInMillis);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } // end for each pe in lookup table
+ } // end if lookup table is not null
+ }
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/AbstractWindowingPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/AbstractWindowingPE.java b/s4-core/src/main/java/org/apache/s4/processor/AbstractWindowingPE.java
new file mode 100644
index 0000000..18f19e1
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/AbstractWindowingPE.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.processor;
+
+import org.apache.s4.schema.Schema;
+import org.apache.s4.schema.Schema.Property;
+import org.apache.s4.util.SlotUtils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+public abstract class AbstractWindowingPE extends AbstractPE {
+ private String slotClassName;
+ private int slotSize = 3600; // default one hour
+ private int windowSize = slotSize * 24; // default, 24 hours
+ private Map<String, String> timestampFields;
+
+ private long lastTimestamp = -1;
+ private Map<Long, Slot> slots;
+ private SlotUtils slotUtils;
+ private Class slotClass;
+
+ public void setSlotClassName(String slotClassName) {
+ this.slotClassName = slotClassName;
+ }
+
+ public void setSlotSize(int slotSize) {
+ this.slotSize = slotSize;
+ }
+
+ public int getSlotSize() {
+ return slotSize;
+ }
+
+ public void setWindowSize(int windowSize) {
+ this.windowSize = windowSize;
+ }
+
+ public int getWindowSize() {
+ return windowSize;
+ }
+
+ public void setTimestampFields(String[] timestampFieldsArray) {
+ timestampFields = new HashMap<String, String>();
+ for (String timeStampFieldInfo : timestampFieldsArray) {
+ StringTokenizer st = new StringTokenizer(timeStampFieldInfo);
+ timestampFields.put(st.nextToken(), st.nextToken());
+ }
+ }
+
+ private OverloadDispatcherSlot overloadDispatcher;
+
+ public AbstractWindowingPE() {
+ }
+
+ public void init() {
+ // this reference will be shared amongst all instances of the pe
+ slotUtils = new SlotUtils(slotSize);
+
+ try {
+ slotClass = Class.forName(slotClassName);
+ } catch (ClassNotFoundException cnfe) {
+ throw new RuntimeException(cnfe);
+ }
+
+ OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(slotClass,
+ true);
+ Class<?> overloadDispatcherClass = oldg.generate();
+
+ try {
+ overloadDispatcher = (OverloadDispatcherSlot) overloadDispatcherClass.newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void processEvent(Object event) {
+ long currentTime = getCurrentTime();
+ long maybeCurrentTime = -1;
+ if (timestampFields != null) {
+ Schema schema = schemaContainer.getSchema(event.getClass());
+ String fieldName = timestampFields.get(getStreamName());
+ if (fieldName != null) {
+ Property property = schema.getProperties().get(fieldName);
+ if (property != null
+ && (property.getType().equals(Long.TYPE) || property.getType()
+ .equals(Long.class))) {
+ try {
+ maybeCurrentTime = (Long) property.getGetterMethod()
+ .invoke(event);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ if (maybeCurrentTime > -1) {
+ currentTime = maybeCurrentTime;
+ lastTimestamp = currentTime;
+ }
+
+ long slotTime = slotUtils.getSlotAtTime(currentTime / 1000); // convert
+ // to
+ // seconds
+
+ if (slots == null) {
+ slots = Collections.synchronizedMap(new HashMap<Long, Slot>());
+ }
+
+ Slot slot = slots.get(slotTime);
+ if (slot == null) {
+ try {
+ slot = (Slot) slotClass.newInstance();
+ } catch (IllegalAccessException iae) {
+ throw new RuntimeException(iae);
+ } catch (InstantiationException ie) {
+ throw new RuntimeException(ie);
+ }
+ slots.put(slotTime, slot);
+ }
+
+ overloadDispatcher.dispatch(slot, event, slotTime, this);
+ }
+
+ public Map<Long, Slot> getSlots() {
+ pruneSlots(getCurrentTime() / 1000);
+
+ return Collections.unmodifiableMap(slots);
+ }
+
+ private void pruneSlots(long time) {
+ HashSet<Long> keys = new HashSet<Long>();
+
+ synchronized (slots) {
+ for (Long key : slots.keySet()) {
+ keys.add(key);
+ }
+ }
+
+ for (Long key : keys) {
+ if (slotUtils.isOutsideWindow(key, windowSize, time)) {
+ slots.remove(key);
+ }
+ }
+ }
+
+ public boolean isCurrentSlot(long slotTime) {
+ long currentSlot = slotUtils.getSlotAtTime(getCurrentTime() / 1000);
+ if (currentSlot == slotTime) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean isOutsideWindow(long time) {
+ Long slotIndexAtTime = slotUtils.getSlotAtTime(time/1000);
+ return slotUtils.isOutsideWindow(slotIndexAtTime, windowSize, getCurrentTime()/1000);
+ }
+
+ public Long getSlotAtOffset(int offset) {
+ return slotUtils.getSlot(offset, getCurrentTime() / 1000);
+ }
+
+ public Slot getSlotAtTime(long time) {
+ pruneSlots(getCurrentTime()/1000);
+ Long slotIndex = slotUtils.getSlotAtTime(time/1000);
+ return slots.get(slotIndex);
+ }
+
+ public Long getSlotTimeForTime(long time) {
+ return slotUtils.getSlotAtTime(time/1000);
+ }
+
+ public static interface Slot {
+ // public void processEvent(Object event, long slotTime,
+ // AbstractWindowingPE pe);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/AsynchronousEventProcessor.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/AsynchronousEventProcessor.java b/s4-core/src/main/java/org/apache/s4/processor/AsynchronousEventProcessor.java
new file mode 100644
index 0000000..ce8dad5
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/AsynchronousEventProcessor.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.processor;
+
+import org.apache.s4.collector.EventWrapper;
+
+public interface AsynchronousEventProcessor {
+
+ void queueWork(EventWrapper eventWrapper);
+
+ // This will always be called by a different thread than the one executing
+ // run()
+ int getQueueSize();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/ControlEventProcessor.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/ControlEventProcessor.java b/s4-core/src/main/java/org/apache/s4/processor/ControlEventProcessor.java
new file mode 100644
index 0000000..077c693
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/ControlEventProcessor.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.processor;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.dispatcher.EventDispatcher;
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.message.PrototypeRequest;
+import org.apache.s4.message.Response;
+import org.apache.s4.message.SinglePERequest;
+
+import java.util.List;
+
+/**
+ * Processes control events.
+ */
+public class ControlEventProcessor {
+
+ private EventDispatcher dispatcher;
+
+ public void setDispatcher(EventDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public void process(EventWrapper e, PrototypeWrapper p) {
+ String en = e.getStreamName(); // e.g. "#joinPe01"
+ String pn = p.getId(); // e.g. "JoinPE01"
+
+ // stream name has to match PE's ID (modulo case).
+ // e.g. "#joinPe01" will match "JoinPE01"
+ if (!en.regionMatches(true, 1, pn, 0, pn.length()))
+ return;
+
+ execute(e, p);
+ }
+
+ protected void execute(EventWrapper e, PrototypeWrapper p) {
+ List<CompoundKeyInfo> keyInfoList = e.getCompoundKeys();
+ Object event = e.getEvent();
+
+ if (event instanceof SinglePERequest) {
+ // Handle Requests to individual PEs
+ if (keyInfoList.isEmpty())
+ return;
+
+ CompoundKeyInfo keyInfo = keyInfoList.get(0);
+
+ String keyVal = keyInfo.getCompoundValue();
+
+ AbstractPE pe = p.lookupPE(keyVal);
+
+ Response response = ((SinglePERequest) event).evaluate(pe);
+ String stream = response.getRInfo().getStream();
+
+ dispatcher.dispatchEvent(stream, response);
+
+ } else if (event instanceof PrototypeRequest) {
+ // Or handle aggregate requests to Prototypes.
+ Response response = ((PrototypeRequest) event).evaluate(p);
+ String stream = response.getRInfo().getStream();
+
+ dispatcher.dispatchEvent(stream, response);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/EventAdvice.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/EventAdvice.java b/s4-core/src/main/java/org/apache/s4/processor/EventAdvice.java
new file mode 100644
index 0000000..51f318f
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/EventAdvice.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.processor;
+
+public class EventAdvice {
+ String eventName;
+ String key;
+
+ public EventAdvice(String eventName, String key) {
+ this.eventName = eventName;
+ this.key = key;
+ }
+
+ public String toString() {
+ return eventName + ":{" + key + "}";
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getEventName() {
+ return eventName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/JoinPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/JoinPE.java b/s4-core/src/main/java/org/apache/s4/processor/JoinPE.java
new file mode 100644
index 0000000..4386699
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/JoinPE.java
@@ -0,0 +1,194 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.processor;
+
+import org.apache.s4.dispatcher.EventDispatcher;
+import org.apache.s4.logger.Monitor;
+import org.apache.s4.schema.Schema;
+import org.apache.s4.schema.Schema.Property;
+import org.apache.s4.schema.SchemaContainer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.log4j.Logger;
+
+public class JoinPE extends AbstractPE {
+ private static Logger logger = Logger.getLogger(JoinPE.class);
+ private Map<String, List<String>> eventFields = new HashMap<String, List<String>>();
+ private Map<String, Object> eventsToJoin;
+ private EventDispatcher dispatcher;
+ private Monitor monitor;
+ private String outputStreamName;
+ private String outputClassName;
+ private Class<?> outputClass;
+
+ public void setDispatcher(EventDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public void setMonitor(Monitor monitor) {
+ this.monitor = monitor;
+ }
+
+ public void setOutputStreamName(String outputStreamName) {
+ this.outputStreamName = outputStreamName;
+ }
+
+ public String getOutputClassName() {
+ return outputClassName;
+ }
+
+ public void setOutputClassName(String outputClassName) {
+ this.outputClassName = outputClassName;
+ try {
+ this.outputClass = Class.forName(this.outputClassName);
+ } catch (ClassNotFoundException cnfe) {
+ throw new RuntimeException(cnfe);
+ }
+ }
+
+ public void setIncludeFields(String[] includeFields) {
+ for (String includeField : includeFields) {
+ StringTokenizer st = new StringTokenizer(includeField);
+ if (st.countTokens() != 2) {
+ Logger.getLogger("s4").error("Bad include field specified: "
+ + includeField);
+ continue;
+ }
+
+ String eventName = st.nextToken();
+ String fieldName = st.nextToken();
+
+ List<String> fieldNames = eventFields.get(eventName);
+ if (fieldNames == null) {
+ fieldNames = new ArrayList<String>();
+ eventFields.put(eventName, fieldNames);
+ }
+
+ if (fieldName.equals("*")) {
+ fieldNames.clear();
+ fieldNames.add("*");
+ } else {
+ fieldNames.add(fieldName);
+ }
+ }
+ }
+
+ @Override
+ public void output() {
+ // TODO Auto-generated method stub
+
+ }
+
+ private SchemaContainer schemaContainer = new SchemaContainer();
+
+ public void processEvent(Object event) {
+ if (eventsToJoin == null) {
+ eventsToJoin = new HashMap<String, Object>();
+ }
+ List<String> fieldNames = eventFields.get(getStreamName());
+ if (fieldNames == null) {
+ return;
+ }
+
+ // we only use the last event that comes through on the given stream
+ eventsToJoin.put(getStreamName(), event);
+
+ if (eventsToJoin.keySet().size() == eventFields.keySet().size()) {
+ Object newEvent = null;
+ try {
+ newEvent = outputClass.newInstance();
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+
+ Schema newEventSchema = schemaContainer.getSchema(newEvent.getClass());
+
+ for (String streamName : eventsToJoin.keySet()) {
+ Object partialEvent = eventsToJoin.get(streamName);
+ Schema partialEventSchema = schemaContainer.getSchema(partialEvent.getClass());
+
+ List<String> includeFields = eventFields.get(streamName);
+ if (includeFields.size() == 1
+ && includeFields.get(0).equals("*")) {
+ for (Property partialEventProperty : partialEventSchema.getProperties()
+ .values()) {
+ copyField(partialEventProperty.getName(),
+ partialEventSchema,
+ newEventSchema,
+ partialEvent,
+ newEvent);
+ }
+ } else {
+ for (String includeField : includeFields) {
+ copyField(includeField,
+ partialEventSchema,
+ newEventSchema,
+ partialEvent,
+ newEvent);
+ }
+ }
+ }
+
+ dispatcher.dispatchEvent(outputStreamName, newEvent);
+ if (logger.isDebugEnabled()) {
+ logger.debug("STEP 7 (JoinPE): " + newEvent.toString());
+ }
+ }
+ }
+
+ private void copyField(String propertyName, Schema sourceSchema,
+ Schema targetSchema, Object source, Object target) {
+ Property sourceProperty = sourceSchema.getProperties()
+ .get(propertyName);
+ Property targetProperty = targetSchema.getProperties()
+ .get(propertyName);
+
+ if (sourceProperty == null || targetProperty == null
+ || !sourceProperty.getType().equals(targetProperty.getType())) {
+ throw new RuntimeException("Specified property " + propertyName
+ + " doesn't exist or is not consistent");
+ }
+
+ try {
+ Object sourceValue = sourceProperty.getGetterMethod()
+ .invoke(source);
+ if (sourceValue == null) {
+ return;
+ }
+ if (sourceProperty.getType().isPrimitive()) {
+ if (sourceValue instanceof Number) {
+ if (((Number) sourceValue).doubleValue() == 0.0) {
+ return;
+ }
+ }
+ if (sourceValue instanceof Boolean) {
+ if (((Boolean) sourceValue).equals(Boolean.FALSE)) {
+ return;
+ }
+ }
+ }
+ targetProperty.getSetterMethod().invoke(target, sourceValue);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/OutputFormatter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/OutputFormatter.java b/s4-core/src/main/java/org/apache/s4/processor/OutputFormatter.java
new file mode 100644
index 0000000..f097977
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/OutputFormatter.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.processor;
+
+public interface OutputFormatter {
+ public Object format(Object outputValue);
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcher.java b/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcher.java
new file mode 100644
index 0000000..4b62bf8
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcher.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.processor;
+
+public interface OverloadDispatcher {
+ public void dispatch(Object pe, Object event);
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherGenerator.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherGenerator.java b/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherGenerator.java
new file mode 100644
index 0000000..9cd2994
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherGenerator.java
@@ -0,0 +1,313 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.processor;
+
+import java.io.FileOutputStream;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.bcel.Constants;
+import org.apache.bcel.classfile.JavaClass;
+import org.apache.bcel.generic.BranchInstruction;
+import org.apache.bcel.generic.ClassGen;
+import org.apache.bcel.generic.ConstantPoolGen;
+import org.apache.bcel.generic.INSTANCEOF;
+import org.apache.bcel.generic.InstructionFactory;
+import org.apache.bcel.generic.InstructionHandle;
+import org.apache.bcel.generic.InstructionList;
+import org.apache.bcel.generic.MethodGen;
+import org.apache.bcel.generic.ObjectType;
+import org.apache.bcel.generic.Type;
+
+public class OverloadDispatcherGenerator {
+ private List<Hierarchy> hierarchies = new ArrayList<Hierarchy>();
+ private Class<?> targetClass;
+ private boolean forSlot = false;
+ private ObjectType abstractWindowingPEType = new ObjectType("org.apache.s4.processor.AbstractWindowingPE");
+ private String classDumpFile;
+
+ public void setClassDumpFile(String classDumpFile) {
+ this.classDumpFile = classDumpFile;
+ }
+
+ public OverloadDispatcherGenerator(Class targetClass) {
+ this(targetClass, false);
+ }
+
+ public OverloadDispatcherGenerator(Class targetClass, boolean forSlot) {
+ this.targetClass = targetClass;
+ this.forSlot = forSlot;
+
+ for (Method method : targetClass.getMethods()) {
+ if (method.getName().equals("processEvent")
+ && method.getReturnType().equals(Void.TYPE)) {
+ this.addHierarchy((Class<?>) method.getParameterTypes()[0]);
+ }
+ }
+ Collections.sort(hierarchies);
+ }
+
+ public void addHierarchy(Class<?> clazz) {
+ hierarchies.add(new Hierarchy(clazz));
+ }
+
+ public Class<Object> generate() {
+ Random rand = new Random(System.currentTimeMillis());
+ String dispatcherClassName = "OverloadDispatcher"
+ + (Math.abs(rand.nextInt() % 3256));
+
+ String interfaceName = "org.apache.s4.processor.OverloadDispatcher";
+ if (forSlot) {
+ interfaceName = "org.apache.s4.processor.OverloadDispatcherSlot";
+ }
+
+ ClassGen cg = new ClassGen(dispatcherClassName,
+ "java.lang.Object",
+ dispatcherClassName + ".java",
+ Constants.ACC_PUBLIC | Constants.ACC_SUPER,
+ new String[] { interfaceName });
+ ConstantPoolGen cp = cg.getConstantPool();
+ InstructionFactory instFactory = new InstructionFactory(cg, cp);
+
+ InstructionList il = new InstructionList();
+
+ // build constructor method for new class
+ MethodGen constructor = new MethodGen(Constants.ACC_PUBLIC,
+ Type.VOID,
+ Type.NO_ARGS,
+ new String[] {},
+ "<init>",
+ dispatcherClassName,
+ il,
+ cp);
+ il.append(InstructionFactory.createLoad(Type.OBJECT, 0));
+ il.append(instFactory.createInvoke("java.lang.Object",
+ "<init>",
+ Type.VOID,
+ Type.NO_ARGS,
+ Constants.INVOKESPECIAL));
+
+ il.append(InstructionFactory.createReturn(Type.VOID));
+ constructor.setMaxStack();
+ constructor.setMaxLocals();
+ cg.addMethod(constructor.getMethod());
+ il.dispose();
+
+ // build dispatch method
+ il = new InstructionList();
+
+ Type[] dispatchArgumentTypes = null;
+ String[] dispatchArgumentNames = null;
+ int postArgumentVariableSlot = 3;
+ if (forSlot) {
+ dispatchArgumentTypes = new Type[] { ObjectType.OBJECT,
+ ObjectType.OBJECT, ObjectType.LONG, abstractWindowingPEType };
+ dispatchArgumentNames = new String[] { "slot", "event", "slotTime",
+ "pe" };
+ postArgumentVariableSlot = 6;
+ } else {
+ dispatchArgumentTypes = new Type[] { ObjectType.OBJECT,
+ ObjectType.OBJECT };
+ dispatchArgumentNames = new String[] { "pe", "event" };
+
+ }
+
+ MethodGen method = new MethodGen(Constants.ACC_PUBLIC,
+ Type.VOID,
+ dispatchArgumentTypes,
+ dispatchArgumentNames,
+ "dispatch",
+ dispatcherClassName,
+ il,
+ cp);
+
+ List<InstructionHandle> targetInstructions = new ArrayList<InstructionHandle>();
+ List<BranchInstruction> branchInstructions = new ArrayList<BranchInstruction>();
+ List<BranchInstruction> gotoInstructions = new ArrayList<BranchInstruction>();
+
+ ObjectType peType = new ObjectType(targetClass.getName());
+
+ il.append(InstructionFactory.createLoad(Type.OBJECT, 1));
+ il.append(instFactory.createCheckCast(peType));
+ il.append(InstructionFactory.createStore(peType,
+ postArgumentVariableSlot));
+
+ for (int i = 0; i < hierarchies.size(); i++) {
+ Hierarchy hierarchy = hierarchies.get(i);
+
+ ObjectType hierarchyTop = new ObjectType(hierarchy.getTop()
+ .getName());
+
+ InstructionHandle ih = il.append(InstructionFactory.createLoad(Type.OBJECT,
+ 2));
+ if (i > 0) {
+ targetInstructions.add(ih);
+ }
+
+ il.append(new INSTANCEOF(cp.addClass(hierarchyTop)));
+ BranchInstruction bi = InstructionFactory.createBranchInstruction(Constants.IFEQ,
+ null);
+ il.append(bi);
+ branchInstructions.add(bi);
+
+ il.append(InstructionFactory.createLoad(peType,
+ postArgumentVariableSlot));
+ il.append(InstructionFactory.createLoad(hierarchyTop, 2));
+ il.append(instFactory.createCheckCast(hierarchyTop));
+ if (forSlot) {
+ il.append(InstructionFactory.createLoad(ObjectType.LONG, 3));
+ il.append(InstructionFactory.createLoad(abstractWindowingPEType,
+ 5));
+ }
+
+ Type[] argumentTypes = null;
+ if (forSlot) {
+ argumentTypes = new Type[] { hierarchyTop, ObjectType.LONG,
+ abstractWindowingPEType };
+ } else {
+ argumentTypes = new Type[] { hierarchyTop };
+ }
+ il.append(instFactory.createInvoke(targetClass.getName(),
+ "processEvent",
+ Type.VOID,
+ argumentTypes,
+ Constants.INVOKEVIRTUAL));
+
+ // no branch needed for last check
+ if (i < (hierarchies.size() - 1)) {
+ bi = InstructionFactory.createBranchInstruction(Constants.GOTO,
+ null);
+ il.append(bi);
+ gotoInstructions.add(bi);
+ }
+ }
+
+ InstructionHandle returnInstruction = il.append(InstructionFactory.createReturn(Type.VOID));
+
+ for (int i = 0; i < targetInstructions.size(); i++) {
+ branchInstructions.get(i).setTarget(targetInstructions.get(i));
+ }
+
+ branchInstructions.get(branchInstructions.size() - 1)
+ .setTarget(returnInstruction);
+
+ for (BranchInstruction gotoInstruction : gotoInstructions) {
+ gotoInstruction.setTarget(returnInstruction);
+ }
+
+ method.setMaxStack();
+ method.setMaxLocals();
+ cg.addMethod(method.getMethod());
+ il.dispose();
+
+ JavaClass jc = cg.getJavaClass();
+ OverloadDispatcherClassLoader cl = new OverloadDispatcherClassLoader();
+
+ // debug
+ if (classDumpFile != null) {
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(classDumpFile);
+ fos.write(jc.getBytes());
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if (fos != null)
+ try {
+ fos.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+
+ return cl.loadClassFromBytes(dispatcherClassName, jc.getBytes());
+
+ }
+
+ public static class Hierarchy implements Comparable<Hierarchy> {
+ private List<Class<?>> classes = new ArrayList<Class<?>>();
+
+ public Hierarchy(Class<?> clazz) {
+ for (Class<?> currentClass = clazz; currentClass != null; currentClass = (Class) currentClass.getSuperclass()) {
+ classes.add(currentClass);
+ }
+ }
+
+ public Class<?> getTop() {
+ if (classes.size() < 1) {
+ return null;
+ }
+ return classes.get(0);
+ }
+
+ public List<Class<?>> getClasses() {
+ return classes;
+ }
+
+ public boolean equals(Hierarchy other) {
+ if (classes.size() != other.classes.size()) {
+ return false;
+ }
+
+ for (int i = 0; i < classes.size(); i++) {
+ if (!classes.get(i).equals(other.classes.get(i))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ public int compareTo(Hierarchy other) {
+ if (this.equals(other)) {
+ return 0;
+ } else if (this.containsClass(other.getTop())) {
+ return -1;
+ }
+
+ return 1;
+ }
+
+ private boolean containsClass(Class<?> other) {
+ for (Class<?> clazz : classes) {
+ if (clazz.equals(other)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ public static class OverloadDispatcherClassLoader extends URLClassLoader {
+ public OverloadDispatcherClassLoader() {
+ super(new URL[] {});
+ }
+
+ public Class loadClassFromBytes(String name, byte[] bytes) {
+ try {
+ return this.loadClass(name);
+ } catch (ClassNotFoundException cnfe) {
+ // expected
+ }
+ return this.defineClass(name, bytes, 0, bytes.length);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherSlot.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherSlot.java b/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherSlot.java
new file mode 100644
index 0000000..b443aa5
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/processor/OverloadDispatcherSlot.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.processor;
+
+public interface OverloadDispatcherSlot {
+ public void dispatch(Object slot, Object event, long slotTime, AbstractWindowingPE pe);
+}