You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:16 UTC

[36/50] [abbrv] Rename packages in preparation for move to Apache

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/Dispatcher.java b/s4-core/src/main/java/org/apache/s4/dispatcher/Dispatcher.java
new file mode 100644
index 0000000..7aeac4c
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/Dispatcher.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.dispatcher.partitioner.Partitioner;
+import org.apache.s4.dispatcher.partitioner.VariableKeyPartitioner;
+import org.apache.s4.dispatcher.transformer.Transformer;
+import org.apache.s4.emitter.EventEmitter;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class Dispatcher implements EventDispatcher {
+    private EventEmitter eventEmitter;
+    private Transformer[] transformers = new Transformer[0];
+    private Partitioner[] partitioners = new Partitioner[0];
+    private String configFilename;
+    private boolean debug = false;
+    private String loggerName = "s4";
+
+    public final static String PARTITION_INFO_KEY = "S4__PartitionInfo";
+
+    public void setTransformers(Transformer[] transformers) {
+        this.transformers = transformers;
+    }
+
+    public void setPartitioners(Partitioner[] partitioners) {
+        this.partitioners = partitioners;
+    }
+
+    public void setEventEmitter(EventEmitter eventEmitter) {
+        this.eventEmitter = eventEmitter;
+    }
+
+    public EventEmitter getEventEmitter() {
+        return this.eventEmitter;
+    }
+
+    public void setConfigFilename(String configFilename) {
+        this.configFilename = configFilename;
+    }
+
+    public void setDebug(boolean debug) {
+        this.debug = debug;
+    }
+
+    public void setLoggerName(String loggerName) {
+        this.loggerName = loggerName;
+    }
+
+    private volatile int eventCount = 0;
+    private volatile int rawEventCount = 0;
+
+    public Dispatcher() {
+
+    }
+
+    int counts[];
+
+    public void init() {
+
+        Runnable r = new Runnable() {
+            private long configFileTime = -1;
+
+            public void run() {
+                long lastCheckTime = System.currentTimeMillis();
+                int lastEventCount = eventCount;
+                int lastRawEventCount = rawEventCount;
+                while (!Thread.currentThread().isInterrupted()) {
+                    int eventCount = Dispatcher.this.eventCount;
+                    long currentTime = System.currentTimeMillis();
+                    double rate = (eventCount - lastEventCount)
+                            / ((currentTime - lastCheckTime) / 1000.0);
+                    double rawRate = (rawEventCount - lastRawEventCount)
+                            / ((currentTime - lastCheckTime) / 1000.0);
+                    lastCheckTime = currentTime;
+                    lastEventCount = eventCount;
+                    lastRawEventCount = rawEventCount;
+                    Logger.getLogger(loggerName).info("Event count is "
+                            + eventCount + "; rate " + rate);
+                    Logger.getLogger(loggerName).info("Raw event count is "
+                            + rawEventCount + "; rate " + rawRate);
+                    if (counts != null) {
+                        for (int i = 0; i < counts.length; i++) {
+                            Logger.getLogger(loggerName).info(i + ": "
+                                    + counts[i]);
+                        }
+                    }
+
+                    configCheck();
+
+                    try {
+                        Thread.sleep(15000);
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+
+            }
+
+            private void configCheck() {
+                if (configFilename == null) {
+                    return;
+                }
+
+                File file = new File(configFilename);
+                if (!file.exists()) {
+                    return;
+                }
+                long lastModified = file.lastModified();
+                if (configFileTime == -1) {
+                    configFileTime = lastModified;
+                    return;
+                }
+
+                if (lastModified > configFileTime) {
+                    Logger.getLogger(loggerName)
+                          .info("Config file has changed. Exiting!!");
+                    System.exit(4);
+                }
+            }
+        };
+        Thread t = new Thread(r);
+        t.start();
+    }
+
+    @Override
+    public void dispatchEvent(String streamName,
+                              List<List<String>> compoundKeyNames, Object event) {
+        dispatchEvent(streamName, event, true, compoundKeyNames);
+    }
+
+    @Override
+    public void dispatchEvent(String streamName, Object event) {
+        dispatchEvent(streamName, event, false, null);
+    }
+
+    private void dispatchEvent(String streamName, Object event,
+                               boolean variableKey,
+                               List<List<String>> compoundKeyNames) {
+        synchronized (this) {
+            rawEventCount++;
+        }
+        if (eventEmitter.getNodeCount() <= 0) {
+            return;
+        } else {
+            if (counts == null) {
+                counts = new int[eventEmitter.getNodeCount()];
+            }
+        }
+
+        try {
+            synchronized (this) {
+                eventCount++;
+            }
+
+            List<CompoundKeyInfo> partionInfoList = new ArrayList<CompoundKeyInfo>();
+            for (Partitioner partitioner : partitioners) {
+                List<CompoundKeyInfo> pInfoList = null;
+
+                if (!variableKey) {
+                    pInfoList = partitioner.partition(streamName,
+                                                      event,
+                                                      eventEmitter.getNodeCount());
+                } else {
+                    if (partitioner instanceof VariableKeyPartitioner) {
+                        VariableKeyPartitioner vp = (VariableKeyPartitioner) partitioner;
+                        pInfoList = vp.partition(streamName,
+                                                 compoundKeyNames,
+                                                 event,
+                                                 eventEmitter.getNodeCount());
+                    }
+                }
+
+                if (pInfoList != null) {
+                    partionInfoList.addAll(pInfoList);
+                }
+            }
+
+            Map<Integer, List<CompoundKeyInfo>> pInfoMap = new HashMap<Integer, List<CompoundKeyInfo>>();
+            for (CompoundKeyInfo partitionInfo : partionInfoList) {
+                int partitionId = partitionInfo.getPartitionId();
+                List<CompoundKeyInfo> listByPartitionNumber = pInfoMap.get(partitionId);
+                if (listByPartitionNumber == null) {
+                    listByPartitionNumber = new ArrayList<CompoundKeyInfo>();
+                    pInfoMap.put(partitionId, listByPartitionNumber);
+                }
+                listByPartitionNumber.add(partitionInfo);
+            }
+
+            for (int partitionId : pInfoMap.keySet()) {
+                EventWrapper eventWrapper = new EventWrapper(streamName,
+                                                             event,
+                                                             pInfoMap.get(partitionId));
+                counts[partitionId]++;
+                eventEmitter.emit(partitionId, eventWrapper);
+            }
+        } catch (Exception e) {
+            Logger.getLogger(loggerName)
+                  .error("Exception in processEvent on thread "
+                                 + Thread.currentThread().getId() + " at time "
+                                 + System.currentTimeMillis(),
+                         e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/EventDispatcher.java b/s4-core/src/main/java/org/apache/s4/dispatcher/EventDispatcher.java
new file mode 100644
index 0000000..12099ba
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/EventDispatcher.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher;
+
+import java.util.List;
+
+public interface EventDispatcher {
+
+    /**
+     * Dispatch event using stream name. Partitioners may be used to partition
+     * the event, possibly based on a pre-determined set of fixed named keys.
+     * 
+     * @param streamName
+     *            name of stream to dispatch on
+     * @param event
+     *            object to dispatch
+     */
+    void dispatchEvent(String streamName, Object event);
+
+    /**
+     * Dispatch event using a stream name and using a set of named keys.
+     * VariableKeyPartitioners may be used to partition the event.
+     * 
+     * @param streamName
+     *            name of stream to dispatch on
+     * @param compoundKeyNames
+     *            keys to use for dispatching
+     * @param event
+     *            object to dispatch
+     */
+    void dispatchEvent(String streamName, List<List<String>> compoundKeyNames,
+                       Object event);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/MultiDispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/MultiDispatcher.java b/s4-core/src/main/java/org/apache/s4/dispatcher/MultiDispatcher.java
new file mode 100644
index 0000000..543810e
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/MultiDispatcher.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher;
+
+import java.util.List;
+
+/**
+ * Dispatcher that sends events through multiple abstract dispatchers.
+ */
+public class MultiDispatcher implements EventDispatcher {
+
+    private EventDispatcher[] dispatchers = null;
+
+    public void setDispatchers(EventDispatcher[] dispatchers) {
+        this.dispatchers = dispatchers;
+    }
+
+    @Override
+    public void dispatchEvent(String streamName, Object event) {
+        if (dispatchers != null) {
+            for (EventDispatcher dispatcher : dispatchers) {
+                dispatcher.dispatchEvent(streamName, event);
+            }
+        }
+    }
+
+    @Override
+    public void dispatchEvent(String streamName,
+                              List<List<String>> compoundKeyNames, Object event) {
+        if (dispatchers != null) {
+            for (EventDispatcher dispatcher : dispatchers) {
+                dispatcher.dispatchEvent(streamName, compoundKeyNames, event);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/StreamExcludingDispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/StreamExcludingDispatcher.java b/s4-core/src/main/java/org/apache/s4/dispatcher/StreamExcludingDispatcher.java
new file mode 100644
index 0000000..bf5e97b
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/StreamExcludingDispatcher.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+public class StreamExcludingDispatcher implements EventDispatcher {
+
+    private EventDispatcher dispatcher = null;
+
+    public void setDispatcher(EventDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    private HashSet<String> streams = null;
+
+    public void setStreams(String[] streams) {
+        this.streams = new HashSet<String>(Arrays.asList(streams));
+    }
+
+    @Override
+    public void dispatchEvent(String streamName, Object event) {
+        if (dispatcher != null
+                && (streams == null || !streams.contains(streamName))) {
+            dispatcher.dispatchEvent(streamName, event);
+        }
+    }
+
+    @Override
+    public void dispatchEvent(String streamName,
+                              List<List<String>> compoundKeyNames, Object event) {
+        if (dispatcher != null
+                && (streams == null || !streams.contains(streamName))) {
+            dispatcher.dispatchEvent(streamName, compoundKeyNames, event);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/StreamSelectingDispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/StreamSelectingDispatcher.java b/s4-core/src/main/java/org/apache/s4/dispatcher/StreamSelectingDispatcher.java
new file mode 100644
index 0000000..4ea7795
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/StreamSelectingDispatcher.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+public class StreamSelectingDispatcher implements EventDispatcher {
+
+    private EventDispatcher dispatcher = null;
+
+    public void setDispatcher(EventDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    private HashSet<String> streams = null;
+
+    public void setStreams(String[] streams) {
+        this.streams = new HashSet<String>(Arrays.asList(streams));
+    }
+
+    @Override
+    public void dispatchEvent(String streamName, Object event) {
+        if (dispatcher != null && streams != null
+                && streams.contains(streamName)) {
+            dispatcher.dispatchEvent(streamName, event);
+        }
+    }
+
+    @Override
+    public void dispatchEvent(String streamName,
+                              List<List<String>> compoundKeyNames, Object event) {
+        if (dispatcher != null && streams != null
+                && streams.contains(streamName)) {
+            dispatcher.dispatchEvent(streamName, compoundKeyNames, event);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/BroadcastPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/BroadcastPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/BroadcastPartitioner.java
new file mode 100644
index 0000000..b3f23d3
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/BroadcastPartitioner.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Partition events to be sent to all parts. This can be used in conjunction
+ * with a Dispatcher to broadcast events.
+ */
+public class BroadcastPartitioner implements Partitioner {
+    @Override
+    public List<CompoundKeyInfo> partition(String streamName, Object event,
+                                           int partitionCount) {
+
+        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+
+        // assign to all partitions
+        for (int i = 0; i < partitionCount; ++i) {
+            CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+
+            partitionInfo.setPartitionId(i);
+
+            partitionInfoList.add(partitionInfo);
+        }
+
+        return partitionInfoList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/CompoundKeyInfo.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/CompoundKeyInfo.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/CompoundKeyInfo.java
new file mode 100644
index 0000000..f7b68f8
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/CompoundKeyInfo.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CompoundKeyInfo implements Serializable {
+    List<KeyInfo> keyInfoList = new ArrayList<KeyInfo>();
+    int partitionId = -1;
+    String compoundValue;
+    String compoundKey;
+
+    public CompoundKeyInfo() {
+    }
+
+    public void addKeyInfo(KeyInfo keyInfo) {
+        keyInfoList.add(keyInfo);
+    }
+
+    public void setPartitionId(int partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    public void setCompoundKey(String compoundKey) {
+        this.compoundKey = compoundKey;
+    }
+
+    public void setCompoundValue(String compoundValue) {
+        this.compoundValue = compoundValue;
+    }
+
+    public List<KeyInfo> getKeyInfoList() {
+        return keyInfoList;
+    }
+
+    public int getPartitionId() {
+        return partitionId;
+    }
+
+    public String getCompoundKey() {
+        if (compoundKey == null) {
+            StringBuffer compoundKeyBuffer = new StringBuffer();
+            for (KeyInfo keyInfo : this.getKeyInfoList()) {
+                if (compoundKeyBuffer.length() > 0) {
+                    compoundKeyBuffer.append(",");
+                }
+                compoundKeyBuffer.append(keyInfo.toString());
+            }
+            compoundKey = compoundKeyBuffer.toString();
+        }
+        return compoundKey;
+    }
+
+    public String getCompoundValue() {
+        return this.compoundValue;
+    }
+
+    public String toString() {
+        return "{" + getCompoundKey() + " = " + getCompoundValue() + "}:"
+                + getPartitionId();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultHasher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultHasher.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultHasher.java
new file mode 100644
index 0000000..0076154
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultHasher.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+public class DefaultHasher implements Hasher {
+
+    HashAlgorithm hashAlgorithm = HashAlgorithm.FNV1_64_HASH;
+
+    @Override
+    public long hash(Object hashKey) {
+        return hashAlgorithm.hash(String.valueOf(hashKey));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java
new file mode 100644
index 0000000..ae2faca
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java
@@ -0,0 +1,351 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import org.apache.s4.schema.Schema;
+import org.apache.s4.schema.Schema.Property;
+import org.apache.s4.schema.SchemaContainer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+public class DefaultPartitioner implements Partitioner, VariableKeyPartitioner {
+    private List<List<String>> keyNameTuple = new ArrayList<List<String>>();
+    private boolean debug = false;
+    private Hasher hasher;
+    private Set<String> streamNameSet;
+    private String delimiter = ":";
+    private boolean fastPath = false;
+
+    public void setDelimiter(String delimiter) {
+        this.delimiter = delimiter;
+    }
+
+    public void setHashKey(String[] simpleKeyStrings) {
+        for (String simpleKeyAsString : simpleKeyStrings) {
+            List<String> keyNameElements = new ArrayList<String>();
+            StringTokenizer st = new StringTokenizer(simpleKeyAsString, "/");
+            while (st.hasMoreTokens()) {
+                keyNameElements.add(st.nextToken());
+            }
+            keyNameTuple.add(keyNameElements);
+        }
+    }
+
+    public void setStreamNames(String[] streamNames) {
+        streamNameSet = new HashSet<String>(streamNames.length);
+        for (String eventType : streamNames) {
+            streamNameSet.add(eventType);
+        }
+    }
+
+    public void setHasher(Hasher hasher) {
+        this.hasher = hasher;
+    }
+
+    public void setDebug(boolean debug) {
+        this.debug = debug;
+    }
+
+    private SchemaContainer schemaContainer = new SchemaContainer();
+
+    public List<CompoundKeyInfo> partition(String streamName, Object event,
+                                           int partitionCount) {
+        return partition(streamName, keyNameTuple, event, partitionCount);
+    }
+
+    public List<CompoundKeyInfo> partition(String streamName,
+                                           List<List<String>> compoundKeyNames,
+                                           Object event, int partitionCount) {
+
+        if (streamName != null && streamNameSet != null
+                && !streamNameSet.contains(streamName)) {
+            return null;
+        }
+
+        // Some event types that need special handling
+        if (event instanceof org.apache.s4.message.Request) {
+            // construct key from request's target
+            org.apache.s4.message.Request r = (io.s4.message.Request) event;
+            return r.partition(hasher, delimiter, partitionCount);
+
+        } else if (event instanceof org.apache.s4.message.Response) {
+            // partition id is encoded in Response, so use it directly.
+            org.apache.s4.message.Response r = (io.s4.message.Response) event;
+            return r.partition(partitionCount);
+
+        } else if (compoundKeyNames == null) {
+            // if compoundKeyNames is null, then assign to a random partition.
+            return partitionRandom(partitionCount);
+        }
+
+        // have to compute key value and
+        // partition based on hash of that value
+
+        Schema schema = schemaContainer.getSchema(event.getClass());
+
+        if (debug) {
+            System.out.println(schema);
+        }
+
+        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+
+        // fast path for single top-level key
+        if (fastPath
+                || (compoundKeyNames.size() == 1 && compoundKeyNames.get(0)
+                                                                    .size() == 1)) {
+            String simpleKeyName = compoundKeyNames.get(0).get(0);
+            if (debug) {
+                System.out.println("Using fast path!");
+            }
+            fastPath = true;
+            KeyInfo keyInfo = new KeyInfo();
+            Property property = schema.getProperties().get(simpleKeyName);
+            if (property == null) {
+                return null;
+            }
+
+            Object value = null;
+            try {
+                value = property.getGetterMethod().invoke(event);
+            } catch (Exception e) {
+                if (debug) {
+                    e.printStackTrace();
+                }
+            }
+
+            if (value == null) {
+                if (debug) {
+                    System.out.println("Fast path: Null value encountered");
+                }
+                return null;
+            }
+            keyInfo.addElementToPath(simpleKeyName);
+            String stringValue = String.valueOf(value);
+            keyInfo.setValue(stringValue);
+            CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+            partitionInfo.addKeyInfo(keyInfo);
+            int partitionId = (int) (hasher.hash(stringValue) % partitionCount);
+            partitionInfo.setPartitionId(partitionId);
+            partitionInfo.setCompoundValue(stringValue);
+            partitionInfoList.add(partitionInfo);
+            if (debug) {
+                System.out.printf("Value %s, partition id %d\n",
+                                  stringValue,
+                                  partitionInfo.getPartitionId());
+            }
+            return partitionInfoList;
+        }
+
+        List<List<KeyInfo>> valueLists = new ArrayList<List<KeyInfo>>();
+        int maxSize = 0;
+
+        for (List<String> simpleKeyPath : compoundKeyNames) {
+            List<KeyInfo> keyInfoList = new ArrayList<KeyInfo>();
+            KeyInfo keyInfo = new KeyInfo();
+            keyInfoList = getKeyValues(event,
+                                       schema,
+                                       simpleKeyPath,
+                                       0,
+                                       keyInfoList,
+                                       keyInfo);
+            if (keyInfoList == null || keyInfoList.size() == 0) {
+                if (debug) {
+                    System.out.println("Null value encountered");
+                }
+                return null; // do no partitioning if any simple key's value
+                             // resolves to null
+            }
+            valueLists.add(keyInfoList);
+            maxSize = Math.max(maxSize, keyInfoList.size());
+
+            if (debug) {
+                printKeyInfoList(keyInfoList);
+            }
+        }
+
+        for (int i = 0; i < maxSize; i++) {
+            String compoundValue = "";
+            CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+            for (List<KeyInfo> keyInfoList : valueLists) {
+                if (i < keyInfoList.size()) {
+                    compoundValue += (compoundValue.length() > 0 ? delimiter
+                            : "") + keyInfoList.get(i).getValue();
+                    partitionInfo.addKeyInfo(keyInfoList.get(i));
+                } else {
+                    compoundValue += (compoundValue.length() > 0 ? delimiter
+                            : "")
+                            + keyInfoList.get(keyInfoList.size() - 1)
+                                         .getValue();
+                    partitionInfo.addKeyInfo(keyInfoList.get(keyInfoList.size() - 1));
+                }
+            }
+
+            // get the partition id
+            int partitionId = (int) (hasher.hash(compoundValue) % partitionCount);
+            partitionInfo.setPartitionId(partitionId);
+            partitionInfo.setCompoundValue(compoundValue);
+            partitionInfoList.add(partitionInfo);
+            if (debug) {
+                System.out.printf("Value %s, partition id %d\n",
+                                  compoundValue,
+                                  partitionInfo.getPartitionId());
+            }
+        }
+
+        return partitionInfoList;
+    }
+
+    // Assign to random partition
+    private List<CompoundKeyInfo> partitionRandom(int partitionCount) {
+        CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+
+        // choose a random int from [0, partitionCount-1]
+        int partitionId = (int) Math.min(partitionCount - 1,
+                                         Math.floor(Math.random()
+                                                 * partitionCount));
+
+        partitionInfo.setPartitionId(partitionId);
+        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+        partitionInfoList.add(partitionInfo);
+
+        return partitionInfoList;
+    }
+
+    private void printKeyInfoList(List<KeyInfo> keyInfoList) {
+        for (KeyInfo aKeyInfo : keyInfoList) {
+            System.out.printf("Path: %s; full path %s; value %s\n",
+                              aKeyInfo.toString(),
+                              aKeyInfo.toString(true),
+                              aKeyInfo.getValue());
+        }
+    }
+
+    private List<KeyInfo> getKeyValues(Object record, Schema schema,
+                                       List<String> keyNameElements,
+                                       int elementIndex,
+                                       List<KeyInfo> keyInfoList,
+                                       KeyInfo keyInfo) {
+        String keyElement = keyNameElements.get(elementIndex);
+        Property property = schema.getProperties().get(keyElement);
+        if (property == null) {
+            return null;
+        }
+
+        keyInfo.addElementToPath(keyElement);
+
+        Object value = null;
+        try {
+            value = property.getGetterMethod().invoke(record);
+        } catch (Exception e) {
+            if (debug) {
+                System.out.println("key element is " + keyElement);
+                e.printStackTrace();
+            }
+        }
+
+        if (value == null) {
+            return null; // return a null KeyInfo list if we hit a null value
+        }
+        if (property.isList()) {
+            List list = (List) value;
+            // TODO: handle case where key does not include property of
+            // component type
+            Schema componentSchema = property.getComponentProperty()
+                                             .getSchema();
+            int listLength = list.size();
+            for (int i = 0; i < listLength; i++) {
+                Object listEntry = list.get(i);
+                KeyInfo keyInfoForListEntry = keyInfo.copy();
+                keyInfoForListEntry.addElementToPath(i);
+                Object partialList = getKeyValues(listEntry,
+                                                  componentSchema,
+                                                  keyNameElements,
+                                                  elementIndex + 1,
+                                                  keyInfoList,
+                                                  keyInfoForListEntry);
+                if (partialList == null) {
+                    return null;
+                }
+            }
+        } else if (property.getSchema() != null) {
+            return getKeyValues(value,
+                                property.getSchema(),
+                                keyNameElements,
+                                elementIndex + 1,
+                                keyInfoList,
+                                keyInfo);
+        } else {
+            keyInfo.setValue(String.valueOf(value));
+            keyInfoList.add(keyInfo);
+        }
+
+        return keyInfoList;
+    }
+
+    public static void main(String args[]) {
+        DefaultPartitioner dp1 = new DefaultPartitioner();
+        DefaultPartitioner dp2 = new DefaultPartitioner();
+        dp1.setDebug(true);
+        dp1.setHashKey(new String[] { "array1/val1", "array1/val2", "query" });
+        dp1.setHasher(new DefaultHasher());
+
+        dp2.setDebug(true);
+        dp2.setHashKey(new String[] { "user" });
+        dp2.setHasher(new DefaultHasher());
+
+        Map<String, Object> event = new HashMap<String, Object>();
+        event.put("user", "fred");
+        event.put("query", "timex watch");
+        List<Map<String, Object>> array1 = new ArrayList<Map<String, Object>>();
+        Map<String, Object> element = new HashMap<String, Object>();
+        element.put("val1", new Long(72));
+        element.put("val2", new Long(11));
+        array1.add(element);
+        element = new HashMap<String, Object>();
+        element.put("val1", new Long(21));
+        element.put("val2", new Long(12));
+        array1.add(element);
+        event.put("array1", array1);
+
+        dp1.partition("test", event, 4);
+        System.out.println("------------");
+        dp2.partition("test", event, 4);
+        System.out.println("------------");
+        event = new HashMap<String, Object>();
+
+        event.put("query", "timex watch");
+        array1 = new ArrayList<Map<String, Object>>();
+        element = new HashMap<String, Object>();
+        element.put("val1", new Long(72));
+        element.put("val2", new Long(11));
+        array1.add(element);
+        element = new HashMap<String, Object>();
+
+        element.put("val2", new Long(12));
+        array1.add(element);
+        event.put("array1", array1);
+
+        dp1.partition("test", event, 4);
+        System.out.println("------------");
+        dp2.partition("test", event, 4);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DummyPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DummyPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DummyPartitioner.java
new file mode 100644
index 0000000..5384ecd
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DummyPartitioner.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DummyPartitioner implements Partitioner {
+
+    @Override
+    public List<CompoundKeyInfo> partition(String streamName, Object event, int partitionCount) {
+        CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+        partitionInfo.setPartitionId(0);
+        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+        partitionInfoList.add(partitionInfo);
+
+        return partitionInfoList;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/HashAlgorithm.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/HashAlgorithm.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/HashAlgorithm.java
new file mode 100644
index 0000000..1704431
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/HashAlgorithm.java
@@ -0,0 +1,160 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import org.apache.s4.util.KeyUtil;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.zip.CRC32;
+
+/** from 
+ * http://github.com/dustin/java-memcached-client/blob/master/src/main/java/net/spy/memcached/HashAlgorithm.java
+ */
+
+/**
+ * Known hashing algorithms for locating a server for a key. Note that all hash
+ * algorithms return 64-bits of hash, but only the lower 32-bits are
+ * significant. This allows a positive 32-bit number to be returned for all
+ * cases.
+ */
+public enum HashAlgorithm {
+
+    /**
+     * Native hash (String.hashCode()).
+     */
+    NATIVE_HASH,
+    /**
+     * CRC32_HASH as used by the perl API. This will be more consistent both
+     * across multiple API users as well as java versions, but is mostly likely
+     * significantly slower.
+     */
+    CRC32_HASH,
+    /**
+     * FNV hashes are designed to be fast while maintaining a low collision
+     * rate. The FNV speed allows one to quickly hash lots of data while
+     * maintaining a reasonable collision rate.
+     * 
+     * @see <a href="http://www.isthe.com/chongo/tech/comp/fnv/">fnv
+     *      comparisons</a>
+     * @see <a href="http://en.wikipedia.org/wiki/Fowler_Noll_Vo_hash">fnv at
+     *      wikipedia</a>
+     */
+    FNV1_64_HASH,
+    /**
+     * Variation of FNV.
+     */
+    FNV1A_64_HASH,
+    /**
+     * 32-bit FNV1.
+     */
+    FNV1_32_HASH,
+    /**
+     * 32-bit FNV1a.
+     */
+    FNV1A_32_HASH,
+    /**
+     * MD5-based hash algorithm used by ketama.
+     */
+    KETAMA_HASH;
+
+    private static final long FNV_64_INIT = 0xcbf29ce484222325L;
+    private static final long FNV_64_PRIME = 0x100000001b3L;
+
+    private static final long FNV_32_INIT = 2166136261L;
+    private static final long FNV_32_PRIME = 16777619;
+
+    /**
+     * Compute the hash for the given key.
+     * 
+     * @return a positive integer hash
+     */
+    public long hash(final String k) {
+        long rv = 0;
+        switch (this) {
+        case NATIVE_HASH:
+            rv = k.hashCode();
+            break;
+        case CRC32_HASH:
+            // return (crc32(shift) >> 16) & 0x7fff;
+            CRC32 crc32 = new CRC32();
+            crc32.update(KeyUtil.getKeyBytes(k));
+            rv = (crc32.getValue() >> 16) & 0x7fff;
+            break;
+        case FNV1_64_HASH: {
+            // Thanks to pierre@demartines.com for the pointer
+            rv = FNV_64_INIT;
+            int len = k.length();
+            for (int i = 0; i < len; i++) {
+                rv *= FNV_64_PRIME;
+                rv ^= k.charAt(i);
+            }
+        }
+            break;
+        case FNV1A_64_HASH: {
+            rv = FNV_64_INIT;
+            int len = k.length();
+            for (int i = 0; i < len; i++) {
+                rv ^= k.charAt(i);
+                rv *= FNV_64_PRIME;
+            }
+        }
+            break;
+        case FNV1_32_HASH: {
+            rv = FNV_32_INIT;
+            int len = k.length();
+            for (int i = 0; i < len; i++) {
+                rv *= FNV_32_PRIME;
+                rv ^= k.charAt(i);
+            }
+        }
+            break;
+        case FNV1A_32_HASH: {
+            rv = FNV_32_INIT;
+            int len = k.length();
+            for (int i = 0; i < len; i++) {
+                rv ^= k.charAt(i);
+                rv *= FNV_32_PRIME;
+            }
+        }
+            break;
+        case KETAMA_HASH:
+            byte[] bKey = computeMd5(k);
+            rv = ((long) (bKey[3] & 0xFF) << 24)
+                    | ((long) (bKey[2] & 0xFF) << 16)
+                    | ((long) (bKey[1] & 0xFF) << 8) | (bKey[0] & 0xFF);
+            break;
+        default:
+            assert false;
+        }
+        return rv & 0xffffffffL; /* Truncate to 32-bits */
+    }
+
+    /**
+     * Get the md5 of the given key.
+     */
+    public static byte[] computeMd5(String k) {
+        MessageDigest md5;
+        try {
+            md5 = MessageDigest.getInstance("MD5");
+        } catch (NoSuchAlgorithmException e) {
+            throw new RuntimeException("MD5 not supported", e);
+        }
+        md5.reset();
+        md5.update(KeyUtil.getKeyBytes(k));
+        return md5.digest();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Hasher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Hasher.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Hasher.java
new file mode 100644
index 0000000..863a34c
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Hasher.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+public interface Hasher {
+    public long hash(Object hashKey);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/KeyInfo.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/KeyInfo.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/KeyInfo.java
new file mode 100644
index 0000000..ebdfc5b
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/KeyInfo.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class KeyInfo implements Serializable {
+    List<KeyPathElement> keyPath = new ArrayList<KeyPathElement>();
+    String value;
+
+    public void setValue(String value) {
+        this.value = value;
+    }
+
+    public String getValue() {
+        return this.value;
+    }
+
+    public void addElementToPath(String keyName) {
+        keyPath.add(new KeyPathElementName(keyName));
+    }
+
+    public void addElementToPath(int index) {
+        keyPath.add(new KeyPathElementIndex(index));
+    }
+
+    private void addElementToPath(KeyPathElement keyPathElement) {
+        keyPath.add(keyPathElement);
+    }
+
+    public List<KeyPathElement> getKeyPath() {
+        return keyPath;
+    }
+
+    public static class KeyPathElement implements Serializable {
+        public enum PathElementType {
+            KEY_NAME, INDEX
+        }
+
+        PathElementType pathElementType;
+
+        public PathElementType getPathElementType() {
+            return pathElementType;
+        }
+
+    }
+
+    public static class KeyPathElementName extends KeyPathElement {
+        String keyName;
+
+        public KeyPathElementName() {
+
+        }
+
+        public KeyPathElementName(String keyName) {
+            pathElementType = PathElementType.KEY_NAME;
+            this.keyName = keyName;
+        }
+
+        public String getKeyName() {
+            return keyName;
+        }
+    }
+
+    public static class KeyPathElementIndex extends KeyPathElement {
+        int index;
+
+        public KeyPathElementIndex() {
+
+        }
+
+        public KeyPathElementIndex(int index) {
+            pathElementType = PathElementType.INDEX;
+            this.index = index;
+        }
+
+        public int getIndex() {
+            return index;
+        }
+    }
+
+    public KeyInfo copy() {
+        KeyInfo newKeyInfo = new KeyInfo();
+        for (KeyPathElement element : keyPath) {
+            newKeyInfo.addElementToPath(element);
+        }
+        return newKeyInfo;
+    }
+
+    public String toString() {
+        return toString(false);
+    }
+
+    public String toString(boolean showFull) {
+        StringBuffer sb = new StringBuffer();
+        for (KeyPathElement element : keyPath) {
+            if (element.getPathElementType() == KeyPathElement.PathElementType.KEY_NAME) {
+                if (sb.length() > 0) {
+                    sb.append("/");
+                }
+                sb.append(((KeyPathElementName) element).getKeyName());
+            } else if (showFull) {
+                sb.append("[")
+                  .append(((KeyPathElementIndex) element).getIndex())
+                  .append("]");
+            }
+        }
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/LoopbackPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/LoopbackPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/LoopbackPartitioner.java
new file mode 100644
index 0000000..a527713
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/LoopbackPartitioner.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import org.apache.s4.emitter.CommLayerEmitter;
+import org.apache.s4.emitter.EventEmitter;
+import org.apache.s4.listener.EventListener;
+import org.apache.s4.processor.PEContainer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A partitioner that assigns events to the current partition, as given by the comm layer.
+ * 
+ */
+public class LoopbackPartitioner implements Partitioner, VariableKeyPartitioner {
+
+    CommLayerEmitter emitter;
+
+    @Override
+    public List<CompoundKeyInfo> partition(String streamName,
+            List<List<String>> compoundKeyNames, Object event,
+            int partitionCount) {
+        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+        CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+        StringBuilder compoundKeyBuilder = new StringBuilder();
+        // This partitioning ignores the values of the keyed attributes;
+        // it partitions to the current partition id of the pe container
+        partitionInfo.setPartitionId(emitter.getListener().getId());
+        for (List<String> keyNames : compoundKeyNames) {
+            for (String keyName : keyNames) {
+                compoundKeyBuilder.append(keyName);
+            }
+        }
+        partitionInfo.setCompoundKey(compoundKeyBuilder.toString());
+        partitionInfoList.add(partitionInfo);
+        return partitionInfoList;
+    }
+
+    @Override
+    public List<CompoundKeyInfo> partition(String streamName, Object event,
+            int partitionCount) {
+        return partition(streamName, new ArrayList<List<String>>(0), event,
+                partitionCount);
+    }
+
+    /**
+     * A reference on the emitter allows getting the current partition id from the comm layer 
+     * @param emitter comm layer emitter
+     */
+    public void setEventEmitter(CommLayerEmitter emitter) {
+        this.emitter = emitter;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Partitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Partitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Partitioner.java
new file mode 100644
index 0000000..cc5a1f3
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Partitioner.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.util.List;
+
+public interface Partitioner {
+    public List<CompoundKeyInfo> partition(String streamName, Object event, int partitionCount);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/RoundRobinPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/RoundRobinPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/RoundRobinPartitioner.java
new file mode 100644
index 0000000..eb4fc04
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/RoundRobinPartitioner.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class RoundRobinPartitioner implements Partitioner {
+    private int counter = 0;
+    private Set<String> streamNameSet;
+
+    public void setStreamNames(String[] streamNames) {
+        streamNameSet = new HashSet<String>(streamNames.length);
+        for (String eventType : streamNames) {
+            streamNameSet.add(eventType);
+        }
+    }
+
+    @Override
+    public List<CompoundKeyInfo> partition(String streamName, Object event,
+            int partitionCount) {
+
+        if (streamName != null && streamNameSet != null
+                && !streamNameSet.contains(streamName)) {
+            return null;
+        }
+
+        CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+        int partitionId = 0;
+
+        synchronized (this) {
+            counter++;
+            if (counter < 0) {
+                counter = 0;
+            }
+            partitionId = counter % partitionCount;
+        }
+
+        partitionInfo.setPartitionId(partitionId);
+        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+        partitionInfoList.add(partitionInfo);
+
+        return partitionInfoList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/TestDefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/TestDefaultPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/TestDefaultPartitioner.java
new file mode 100644
index 0000000..b6f1d31
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/TestDefaultPartitioner.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestDefaultPartitioner {
+    public static void main(String[] args) {
+        DefaultPartitioner dp1 = new DefaultPartitioner();
+        DefaultPartitioner dp2 = new DefaultPartitioner();
+        dp1.setDebug(true);
+        dp1.setHashKey(new String[] { "list1/val1", "list1/val2", "query" });
+        dp1.setHasher(new DefaultHasher());
+
+        dp2.setDebug(true);
+        dp2.setHashKey(new String[] { "user" });
+        dp2.setHasher(new DefaultHasher());
+
+        TopLevel tl1 = new TopLevel();
+        tl1.setQuery("Hello there");
+        tl1.setUser("spitzer");
+
+        for (int i = 0; i < 4; i++) {
+            Nested n = new Nested();
+            n.setVal1(i + 77);
+            n.setVal2(i / 10.7);
+            tl1.addNested(n);
+        }
+
+        dp1.partition("test", tl1, 4);
+        dp2.partition("test", tl1, 4);
+
+    }
+
+    static class TopLevel {
+        private String query;
+        private List<Nested> list1 = new ArrayList<Nested>();
+        private String user;
+
+        public String getQuery() {
+            return query;
+        }
+
+        public void setQuery(String query) {
+            this.query = query;
+        }
+
+        public List<Nested> getList1() {
+            return list1;
+        }
+
+        public void setList1(List<Nested> list1) {
+            this.list1 = list1;
+        }
+
+        public String getUser() {
+            return user;
+        }
+
+        public void setUser(String user) {
+            this.user = user;
+        }
+
+        public void addNested(Nested nested) {
+            list1.add(nested);
+        }
+    }
+
+    static class Nested {
+        long val1;
+        double val2;
+
+        public Nested() {
+
+        }
+
+        public long getVal1() {
+            return val1;
+        }
+
+        public void setVal1(long val1) {
+            this.val1 = val1;
+        }
+
+        public double getVal2() {
+            return val2;
+        }
+
+        public void setVal2(double val2) {
+            this.val2 = val2;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/VariableKeyPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/VariableKeyPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/VariableKeyPartitioner.java
new file mode 100644
index 0000000..722de91
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/VariableKeyPartitioner.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.util.List;
+
+public interface VariableKeyPartitioner {
+    public List<CompoundKeyInfo> partition(String streamName,
+                                           List<List<String>> keyNames,
+                                           Object event, int partitionCount);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/transformer/Transformer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/transformer/Transformer.java b/s4-core/src/main/java/org/apache/s4/dispatcher/transformer/Transformer.java
new file mode 100644
index 0000000..17cff66
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/transformer/Transformer.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.dispatcher.transformer;
+
+
+public interface Transformer {
+    public Object transform(Object event);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/emitter/CommLayerEmitter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/emitter/CommLayerEmitter.java b/s4-core/src/main/java/org/apache/s4/emitter/CommLayerEmitter.java
new file mode 100644
index 0000000..155f933
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/emitter/CommLayerEmitter.java
@@ -0,0 +1,261 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.emitter;
+
+import static org.apache.s4.util.MetricsName.S4_CORE_METRICS;
+import static org.apache.s4.util.MetricsName.S4_EVENT_METRICS;
+import static org.apache.s4.util.MetricsName.low_level_emitter_msg_out_ct;
+import static org.apache.s4.util.MetricsName.low_level_emitter_out_err_ct;
+import static org.apache.s4.util.MetricsName.low_level_emitter_qsz;
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.comm.core.SenderProcess;
+import org.apache.s4.comm.core.Serializer;
+import org.apache.s4.listener.CommLayerListener;
+import org.apache.s4.logger.Monitor;
+import org.apache.s4.message.Request;
+import org.apache.s4.serialize.SerializerDeserializer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.log4j.Logger;
+
+public class CommLayerEmitter implements EventEmitter, Runnable {
+    private static Logger logger = Logger.getLogger(CommLayerEmitter.class);
+
+    // config for emitter is coupled with listener, if there is a non-null
+    // listener. This prevents 2 tasks from being checked out of the cluster
+    // config for the same node: one for the listener and another for the
+    // emitter.
+    private CommLayerListener listener;
+
+    private SenderProcess sender;
+    private int nodeCount;
+    private BlockingQueue<MessageHolder> messageQueue = new LinkedBlockingDeque<MessageHolder>();
+    private String senderId;
+    private String clusterManagerAddress;
+    private String appName;
+    private String listenerAppName = null;
+    private Monitor monitor;
+    private SerializerDeserializer serDeser;
+
+    public void setSerDeser(SerializerDeserializer serDeser) {
+        this.serDeser = serDeser;
+    }
+
+    public void setMonitor(Monitor monitor) {
+        this.monitor = monitor;
+    }
+
+    public void setSenderId(String senderId) {
+        this.senderId = senderId;
+    }
+
+    public void setClusterManagerAddress(String clusterManagerAddress) {
+        this.clusterManagerAddress = clusterManagerAddress;
+    }
+
+    public void setAppName(String appName) {
+        this.appName = appName;
+    }
+
+    public void setListenerAppName(String listenerAppName) {
+        this.listenerAppName = listenerAppName;
+    }
+
+    public void setListener(CommLayerListener listener) {
+        this.listener = listener;
+    }
+    
+    public CommLayerListener getListener() {
+        return this.listener;
+    }
+
+    public void init() {
+
+        Thread t = new Thread(this, "CommLayerEmitter");
+        t.start();
+    }
+
+    public void queueMessage(MessageHolder messageHolder) {
+        messageQueue.add(messageHolder);
+        try {
+            if (monitor != null) {
+                monitor.set(low_level_emitter_qsz.toString(),
+                            messageQueue.size(),
+                            S4_CORE_METRICS.toString());
+            }
+        } catch (Exception e) {
+            logger.error("Exception in monitor metrics on thread "
+                    + Thread.currentThread().getId(), e);
+        }
+    }
+
+    @Override
+    public void emit(int partitionId, EventWrapper eventWrapper) {
+
+        // Special handling required for Requests
+        if (eventWrapper.getEvent() instanceof Request) {
+            decorateRequest((Request) eventWrapper.getEvent());
+        }
+
+        try {
+        	MessageHolder mh = new MessageHolder(partitionId, eventWrapper);            
+            queueMessage(mh);
+        } catch (RuntimeException rte) {
+            if (monitor != null) {
+                monitor.increment(low_level_emitter_out_err_ct.toString(),
+                                  1,
+                                  S4_EVENT_METRICS.toString(),
+                                  "et",
+                                  eventWrapper.getStreamName());
+            }
+            Logger.getLogger("s4").error("Error serializing or emitting event "
+                                                 + eventWrapper.getEvent(),
+                                         rte);
+            throw rte;
+        }
+    }
+
+    // Add partition id of sender
+    private void decorateRequest(Request r) {
+        Request.RInfo rinfo = r.getRInfo();
+
+        if (rinfo != null && listener != null)
+            rinfo.setPartition(listener.getId());
+    }
+
+    @Override
+    public int getNodeCount() {
+        if (listener == null) {
+            return 1;
+        }
+        return nodeCount;
+    }
+
+    @Override
+    public void run() {
+        if (listener == null) {
+            if (listenerAppName == null) {
+                listenerAppName = appName;
+            }
+            sender = new SenderProcess(clusterManagerAddress,
+                                       appName,
+                                       listenerAppName);
+            Map<String, String> map = new HashMap<String, String>();
+            map.put("SenderId", "" + senderId);
+            sender.setSerializer(new PassThroughSerializer());
+            sender.acquireTaskAndCreateSender(map);
+        } else {
+            Object listenerConfig = null;
+            try {
+                listenerConfig = listener.getListenerConfig();
+                if (listenerConfig == null) {
+                    logger.info("CommLayerEmitter going to wait for listener to acquire task");
+                    synchronized (listener) {
+                        listenerConfig = listener.getListenerConfig();
+                        if (listenerConfig == null) {
+                            listener.wait();
+                            listenerConfig = listener.getListenerConfig();
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                logger.info("Exception in CommLayerEmitter.run()", e);
+            }
+            logger.info("Creating sender process with " + listenerConfig);
+
+            String destinationAppName = (listenerAppName != null
+                    ? listenerAppName : listener.getAppName());
+
+            sender = new SenderProcess(listener.getClusterManagerAddress(),
+                                       listener.getAppName(),
+                                       destinationAppName);
+
+            sender.setSerializer(new PassThroughSerializer());
+            sender.createSenderFromConfig(listenerConfig);
+            nodeCount = sender.getNumOfPartitions();
+        }
+        boolean isSent = false;
+        while (!Thread.interrupted()) {
+            isSent = false;
+            try {
+                MessageHolder mh = messageQueue.take();
+                byte[] rawMessage = serDeser.serialize(mh.getEventWrapper());
+                if (listener == null) {
+                    isSent = sender.send(rawMessage);
+                } else {
+                    isSent = sender.sendToPartition(mh.getPartitionId(),
+                                                    rawMessage);
+                }
+
+                if (isSent) {
+                    if (monitor != null) {
+                        monitor.increment(low_level_emitter_msg_out_ct.toString(),
+                                          1,
+                                          S4_CORE_METRICS.toString());
+                    }
+                } else {
+                    if (monitor != null) {
+                        monitor.increment(low_level_emitter_out_err_ct.toString(),
+                                          1,
+                                          S4_CORE_METRICS.toString());
+                    }
+                    logger.warn("commlayer emit failed ...");
+                }
+            } catch (InterruptedException ie) {
+                if (monitor != null) {
+                    monitor.increment(low_level_emitter_out_err_ct.toString(),
+                                      1,
+                                      S4_CORE_METRICS.toString());
+                }
+                Thread.currentThread().interrupt();
+            } catch (Exception e) {
+                Logger.getLogger("s4").error("Error emitting message", e);
+            }
+        }
+    }
+
+    public class PassThroughSerializer implements Serializer {
+        public byte[] serialize(Object input) {
+            if (input instanceof byte[]) {
+                return (byte[]) input;
+            } else {
+                return null;
+            }
+        }
+    }
+
+    class MessageHolder {
+        private int partitionId;
+        private EventWrapper eventWrapper;
+        
+        MessageHolder(int partitionId, EventWrapper eventWrapper) {
+            this.partitionId = partitionId;
+            this.eventWrapper = eventWrapper;
+        }
+
+        int getPartitionId() {
+            return partitionId;
+        }
+
+        EventWrapper getEventWrapper() {
+            return eventWrapper;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/emitter/EventEmitter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/emitter/EventEmitter.java b/s4-core/src/main/java/org/apache/s4/emitter/EventEmitter.java
new file mode 100644
index 0000000..715eb2e
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/emitter/EventEmitter.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.emitter;
+
+import org.apache.s4.collector.EventWrapper;
+
+public interface EventEmitter {
+    public void emit(int partitionId, EventWrapper eventWrapper);
+
+    public int getNodeCount();
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/CheckpointingEvent.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/CheckpointingEvent.java b/s4-core/src/main/java/org/apache/s4/ft/CheckpointingEvent.java
new file mode 100644
index 0000000..f5ee6b2
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/CheckpointingEvent.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.ft;
+
+/**
+ * 
+ * <p>
+ * This class defines a checkpointing event (either a request for checkpoint or for recovery).
+ * </p>
+ * <p>
+ * Checkpointing events are queued in the PE event queue and processed according the the PE processor scheduler (FIFO).
+ * </p>
+ */
+public abstract class CheckpointingEvent {
+
+    private SafeKeeperId safeKeeperId;
+
+    /**
+     * This is a requirement of the serialization framework
+     */
+    public CheckpointingEvent() {
+    }
+
+    /**
+     * Constructor identifying the PE subject to checkpointing or recovery
+     * @param safeKeeperId safeKeeperId
+     */
+    public CheckpointingEvent(SafeKeeperId safeKeeperId) {
+        this.safeKeeperId = safeKeeperId;
+    }
+
+    public SafeKeeperId getSafeKeeperId() {
+        return safeKeeperId;
+    }
+
+    public void setSafeKeeperId(SafeKeeperId id) {
+        this.safeKeeperId = id;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/DefaultFileSystemStateStorage.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/DefaultFileSystemStateStorage.java b/s4-core/src/main/java/org/apache/s4/ft/DefaultFileSystemStateStorage.java
new file mode 100644
index 0000000..7362af8
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/DefaultFileSystemStateStorage.java
@@ -0,0 +1,242 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.ft;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.log4j.Logger;
+
+/**
+ * <p>
+ * Implementation of a file system backend storage to persist checkpoints.
+ * </p>
+ * <p>
+ * The file system may be the default local file system when running on a single
+ * machine, but should be a distributed file system such as NFS when running on
+ * a cluster.
+ * </p>
+ * <p>
+ * Checkpoints are stored in individual files (1 file = 1 safeKeeperId) in
+ * directories according to the following structure:
+ * <code>(storageRootpath)/prototypeId/safeKeeperId</code>
+ * </p>
+ * 
+ */
+public class DefaultFileSystemStateStorage implements StateStorage {
+
+    private static org.apache.log4j.Logger logger = Logger.getLogger(DefaultFileSystemStateStorage.class);
+    private String storageRootPath;
+
+    public DefaultFileSystemStateStorage() {
+    }
+
+    /**
+     * <p>
+     * Called by the dependency injection framework, after construction.
+     * <p/>
+     */
+    public void init() {
+        checkStorageDir();
+    }
+
+    @Override
+    public byte[] fetchState(SafeKeeperId key) {
+        File file = safeKeeperID2File(key, storageRootPath);
+        if (file != null && file.exists()) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("Fetching " + file.getAbsolutePath() + "for : " + key);
+            }
+            // TODO use commons-io or guava
+            FileInputStream in = null;
+            try {
+                in = new FileInputStream(file);
+
+                long length = file.length();
+
+                /*
+                 * Arrays can only be created using int types, so ensure that
+                 * the file size is not too big before we downcast to create the
+                 * array.
+                 */
+                if (length > Integer.MAX_VALUE) {
+                    throw new IOException("Error file is too large: " + file.getName() + " " + length + " bytes");
+                }
+
+                byte[] buffer = new byte[(int) length];
+                int offSet = 0;
+                int numRead = 0;
+
+                while (offSet < buffer.length && (numRead = in.read(buffer, offSet, buffer.length - offSet)) >= 0) {
+                    offSet += numRead;
+                }
+
+                if (offSet < buffer.length) {
+                    throw new IOException("Error, could not read entire file: " + file.getName() + " " + offSet + "/"
+                            + buffer.length + " bytes read");
+                }
+
+                in.close();
+                return buffer;
+            } catch (FileNotFoundException e1) {
+                logger.error(e1.getMessage(), e1);
+            } catch (IOException e2) {
+                logger.error(e2.getMessage(), e2);
+            } finally {
+                if (in != null) {
+                    try {
+                        in.close();
+                    } catch (Exception e) {
+                        logger.warn(e.getMessage(), e);
+                    }
+                }
+            }
+        }
+        return null;
+
+    }
+
+    @Override
+    public Set<SafeKeeperId> fetchStoredKeys() {
+        Set<SafeKeeperId> keys = new HashSet<SafeKeeperId>();
+        File rootDir = new File(storageRootPath);
+        File[] dirs = rootDir.listFiles(new FileFilter() {
+            @Override
+            public boolean accept(File file) {
+                return file.isDirectory();
+            }
+        });
+        for (File dir : dirs) {
+            File[] files = dir.listFiles(new FileFilter() {
+                @Override
+                public boolean accept(File file) {
+                    return (file.isFile());
+                }
+            });
+            for (File file : files) {
+                keys.add(file2SafeKeeperID(file));
+            }
+        }
+        return keys;
+    }
+
+    // files kept as : root/<prototypeId>/encodedKeyWithFullInfo
+    private static File safeKeeperID2File(SafeKeeperId key, String storageRootPath) {
+
+        return new File(storageRootPath + File.separator + key.getPrototypeId() + File.separator
+                + Base64.encodeBase64URLSafeString(key.getStringRepresentation().getBytes()));
+    }
+
+    private static SafeKeeperId file2SafeKeeperID(File file) {
+        SafeKeeperId id = null;
+        id = new SafeKeeperId(new String(Base64.decodeBase64(file.getName())));
+        return id;
+    }
+
+    public String getStorageRootPath() {
+        return storageRootPath;
+    }
+
+    public void setStorageRootPath(String storageRootPath) {
+        this.storageRootPath = storageRootPath;
+        File rootPathFile = new File(storageRootPath);
+        if (!rootPathFile.exists()) {
+            if (!rootPathFile.mkdirs()) {
+                logger.error("could not create root storage directory : " + storageRootPath);
+            }
+
+        }
+    }
+
+    public void checkStorageDir() {
+        if (storageRootPath == null) {
+
+            File defaultStorageDir = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator
+                    + "storage");
+            storageRootPath = defaultStorageDir.getAbsolutePath();
+            if (logger.isInfoEnabled()) {
+                logger.info("Unspecified storage dir; using default dir: " + defaultStorageDir.getAbsolutePath());
+            }
+            if (!defaultStorageDir.exists()) {
+                if (!(defaultStorageDir.mkdirs())) {
+                    logger.error("Storage directory not specified, and cannot create default storage directory : "
+                            + defaultStorageDir.getAbsolutePath() + "\n Checkpointing and recovery will be disabled.");
+                }
+            }
+        }
+    }
+
+    @Override
+    public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback) {
+        File f = safeKeeperID2File(key, storageRootPath);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Checkpointing [" + key + "] into file: [" + f.getAbsolutePath() + "]");
+        }
+        if (!f.exists()) {
+            if (!f.getParentFile().exists()) {
+                // parent file has prototype id
+                if (!f.getParentFile().mkdir()) {
+                    callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
+                            "Cannot create directory for storing PE [" + key.toString() + "] for prototype: "
+                                    + f.getParentFile().getAbsolutePath());
+                    return ;
+                }
+            }
+            // TODO handle IO exception
+            try {
+                f.createNewFile();
+            } catch (IOException e) {
+                callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
+                        key.toString() + " : " + e.getMessage());
+                return ;
+            }
+        } else {
+            if (!f.delete()) {
+                callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
+                        "Cannot delete previously saved checkpoint file [" + f.getParentFile().getAbsolutePath() + "]");
+                return ;
+            }
+        }
+        FileOutputStream fos = null;
+        try {
+            fos = new FileOutputStream(f);
+            fos.write(state);
+            callback.storageOperationResult(SafeKeeper.StorageResultCode.SUCCESS, key.toString());
+        } catch (FileNotFoundException e) {
+            callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
+                    key.toString() + " : " + e.getMessage());
+        } catch (IOException e) {
+            callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
+                    key.toString() + " : " + e.getMessage());
+        } finally {
+            try {
+                if (fos != null) {
+                    fos.close();
+                }
+            } catch (IOException e) {
+                logger.error(e.getMessage(), e);
+            }
+        }
+    }
+
+}