You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:16 UTC

[37/50] [abbrv] Rename packages in preparation for move to Apache

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/InitiateCheckpointingEvent.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/InitiateCheckpointingEvent.java b/s4-core/src/main/java/org/apache/s4/ft/InitiateCheckpointingEvent.java
new file mode 100644
index 0000000..d1a04b4
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/InitiateCheckpointingEvent.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.ft;
+
+/**
+ * 
+ * Event that triggers a checkpoint.
+ *
+ */
+public class InitiateCheckpointingEvent extends CheckpointingEvent {
+
+    public InitiateCheckpointingEvent() {
+        // as required by default kryo serializer
+    }
+
+    public InitiateCheckpointingEvent(SafeKeeperId safeKeeperId) {
+        super(safeKeeperId);
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/LoggingStorageCallbackFactory.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/LoggingStorageCallbackFactory.java b/s4-core/src/main/java/org/apache/s4/ft/LoggingStorageCallbackFactory.java
new file mode 100644
index 0000000..38c6e2a
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/LoggingStorageCallbackFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.ft;
+
+import org.apache.s4.ft.SafeKeeper.StorageResultCode;
+
+import org.apache.log4j.Logger;
+
+/**
+ * A factory for creating storage callbacks that simply log callback results
+ * 
+ * 
+ */
+public class LoggingStorageCallbackFactory implements StorageCallbackFactory {
+
+    @Override
+    public StorageCallback createStorageCallback() {
+        return new StorageCallbackLogger();
+    }
+
+    /**
+     * A basic storage callback that simply logs results from storage operations
+     * 
+     */
+    static class StorageCallbackLogger implements StorageCallback {
+
+        private static Logger logger = Logger.getLogger("s4-ft");
+
+        @Override
+        public void storageOperationResult(StorageResultCode code, Object message) {
+            if (StorageResultCode.SUCCESS == code) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Callback from storage: " + StorageResultCode.SUCCESS.name() + " : " + message);
+                }
+            } else {
+                logger.warn("Callback from storage: " + StorageResultCode.FAILURE.name() + " : " + message);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/RecoveryEvent.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/RecoveryEvent.java b/s4-core/src/main/java/org/apache/s4/ft/RecoveryEvent.java
new file mode 100644
index 0000000..26a3530
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/RecoveryEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.ft;
+
+/**
+ * 
+ * Event that triggers the recovery of a checkpoint. 
+ *
+ */
+public class RecoveryEvent extends CheckpointingEvent {
+
+    public RecoveryEvent() {
+        // as required by default kryo serializer
+    }
+
+    public RecoveryEvent(SafeKeeperId safeKeeperId) {
+        super(safeKeeperId);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/RedisStateStorage.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/RedisStateStorage.java b/s4-core/src/main/java/org/apache/s4/ft/RedisStateStorage.java
new file mode 100644
index 0000000..d2656a6
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/RedisStateStorage.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.ft;
+
+import org.apache.s4.ft.SafeKeeper.StorageResultCode;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+
+/**
+ * <p>
+ * This class implements a storage backend based on Redis. Redis is a key-value
+ * store.
+ * </p>
+ * <p>
+ * See {@link http://redis.io/} for more information.
+ * </p>
+ * <p>
+ * Redis must be running as an external service. References to this external
+ * services must be injected during the initialization of the S4 platform.
+ * </p>
+ * 
+ * 
+ */
+public class RedisStateStorage implements StateStorage {
+
+    static Logger logger = Logger.getLogger("s4-ft");
+    private JedisPool jedisPool;
+    private String redisHost;
+    private int redisPort;
+
+    public void clear() {
+        Jedis jedis = jedisPool.getResource();
+        try {
+            jedis.flushAll();
+        } finally {
+            jedisPool.returnResource(jedis);
+        }
+    }
+
+    public void init() {
+        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
+        // TODO optional parameterization
+        jedisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort);
+    }
+
+    @Override
+    public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback) {
+        Jedis jedis = jedisPool.getResource();
+        String statusCode = "UNKNOWN";
+        try {
+            statusCode = jedis.set(key.getStringRepresentation().getBytes(), state);
+        } finally {
+            jedisPool.returnResource(jedis);
+        }
+        if ("OK".equals(statusCode)) {
+            callback.storageOperationResult(StorageResultCode.SUCCESS, "Redis result code is [" + statusCode + "] for key [" + key.toString() +"]");
+        } else {
+            callback.storageOperationResult(StorageResultCode.FAILURE, "Unexpected redis result code : [" + statusCode + "] for key [" + key.toString() +"]");
+        }
+    }
+
+    @Override
+    public byte[] fetchState(SafeKeeperId key) {
+        Jedis jedis = jedisPool.getResource();
+        try {
+            return jedis.get(key.getStringRepresentation().getBytes());
+        } finally {
+            jedisPool.returnResource(jedis);
+        }
+    }
+
+    @Override
+    public Set<SafeKeeperId> fetchStoredKeys() {
+        Jedis jedis = jedisPool.getResource();
+        try {
+            Set<String> keys = jedis.keys("*");
+            Set<SafeKeeperId> result = new HashSet<SafeKeeperId>(keys.size());
+            for (String s : keys)
+                result.add(new SafeKeeperId(s));
+            return result;
+        } finally {
+            jedisPool.returnResource(jedis);
+        }
+
+    }
+    
+    public String getRedisHost() {
+        return redisHost;
+    }
+
+    public void setRedisHost(String redisHost) {
+        this.redisHost = redisHost;
+    }
+
+    public int getRedisPort() {
+        return redisPort;
+    }
+
+    public void setRedisPort(int redisPort) {
+        this.redisPort = redisPort;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
new file mode 100644
index 0000000..950649b
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.ft;
+
+import org.apache.s4.dispatcher.Dispatcher;
+import org.apache.s4.dispatcher.partitioner.Hasher;
+import org.apache.s4.emitter.CommLayerEmitter;
+import org.apache.s4.processor.AbstractPE;
+import org.apache.s4.serialize.SerializerDeserializer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+/**
+ * 
+ * <p>
+ * This class is responsible for coordinating interactions between the S4 event
+ * processor and the checkpoint storage backend. In particular, it schedules
+ * asynchronous save tasks to be executed on the backend.
+ * </p>
+ * 
+ * 
+ * 
+ */
+public class SafeKeeper {
+
+    public enum StorageResultCode {
+        SUCCESS, FAILURE
+    }
+
+    private static Logger logger = Logger.getLogger("s4-ft");
+    private StateStorage stateStorage;
+    private Dispatcher loopbackDispatcher;
+    private SerializerDeserializer serializer;
+    private Hasher hasher;
+    // monitor field injection through a latch
+    private CountDownLatch signalReady = new CountDownLatch(2);
+    private CountDownLatch signalNodesAvailable = new CountDownLatch(1);
+    private StorageCallbackFactory storageCallbackFactory = new LoggingStorageCallbackFactory();
+
+    ThreadPoolExecutor threadPool;
+
+    int maxWriteThreads = 1;
+    int writeThreadKeepAliveSeconds = 120;
+    int maxOutstandingWriteRequests = 1000;
+
+    public SafeKeeper() {
+    }
+
+    /**
+     * <p>
+     * This init() method <b>must</b> be called by the dependency injection
+     * framework. It waits until all required dependencies are injected in
+     * SafeKeeper, and until the node count is accessible from the communication
+     * layer.
+     * </p>
+     */
+    public void init() {
+        try {
+            getReadySignal().await();
+        } catch (InterruptedException e1) {
+            e1.printStackTrace();
+        }
+        threadPool = new ThreadPoolExecutor(1, maxWriteThreads, writeThreadKeepAliveSeconds, TimeUnit.SECONDS,
+                new ArrayBlockingQueue<Runnable>(maxOutstandingWriteRequests));
+        logger.debug("Started thread pool with maxWriteThreads=[" + maxWriteThreads
+                + "], writeThreadKeepAliveSeconds=[" + writeThreadKeepAliveSeconds + "], maxOutsandingWriteRequests=["
+                + maxOutstandingWriteRequests + "]");
+
+        int nodeCount = getLoopbackDispatcher().getEventEmitter().getNodeCount();
+        // required wait until nodes are available
+        while (nodeCount == 0) {
+            try {
+                Thread.sleep(500);
+            } catch (InterruptedException ignored) {
+            }
+            nodeCount = getLoopbackDispatcher().getEventEmitter().getNodeCount();
+        }
+
+        signalNodesAvailable.countDown();
+    }
+
+    /**
+     * Forwards a call to checkpoint a PE to the backend storage.
+     * 
+     * @param key
+     *            safeKeeperId
+     * @param state
+     *            checkpoint data
+     */
+    public void saveState(SafeKeeperId safeKeeperId, byte[] serializedState) {
+        StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
+        try {
+            threadPool.submit(createSaveStateTask(safeKeeperId, serializedState));
+        } catch (RejectedExecutionException e) {
+            storageCallback.storageOperationResult(StorageResultCode.FAILURE,
+                    "Could not submit task to persist checkpoint. Remaining capacity for task queue is ["
+                            + threadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+                            + threadPool.getQueue().size() + "] ; maximum capacity is [" + maxOutstandingWriteRequests
+                            + "]");
+        }
+    }
+
+    private SaveStateTask createSaveStateTask(SafeKeeperId safeKeeperId, byte[] serializedState) {
+        return new SaveStateTask(safeKeeperId, serializedState, storageCallbackFactory.createStorageCallback(),
+                stateStorage);
+    }
+
+    /**
+     * Fetches checkpoint data from storage for a given PE
+     * 
+     * @param key
+     *            safeKeeperId
+     * @return checkpoint data
+     */
+    public byte[] fetchSerializedState(SafeKeeperId key) {
+
+        try {
+            signalNodesAvailable.await();
+        } catch (InterruptedException ignored) {
+        }
+        byte[] result = null;
+        result = stateStorage.fetchState(key);
+        return result;
+    }
+
+    /**
+     * Generates a checkpoint event for a given PE, and enqueues it in the local
+     * event queue.
+     * 
+     * @param pe
+     *            reference to a PE
+     */
+    public void generateCheckpoint(AbstractPE pe) {
+        InitiateCheckpointingEvent initiateCheckpointingEvent = new InitiateCheckpointingEvent(pe.getSafeKeeperId());
+
+        List<List<String>> compoundKeyNames;
+        if (pe.getKeyValueString() == null) {
+            logger.warn("Only keyed PEs can be checkpointed. Current PE [" + pe.getSafeKeeperId()
+                    + "] will not be checkpointed.");
+        } else {
+            List<String> list = new ArrayList<String>(1);
+            list.add("");
+            compoundKeyNames = new ArrayList<List<String>>(1);
+            compoundKeyNames.add(list);
+            loopbackDispatcher.dispatchEvent(pe.getId() + "_checkpointing", compoundKeyNames,
+                    initiateCheckpointingEvent);
+        }
+    }
+
+    /**
+     * Generates a recovery event, and enqueues it in the local event queue.<br/>
+     * This can be used for an eager recovery mechanism.
+     * 
+     * @param safeKeeperId
+     *            safeKeeperId to recover
+     */
+    public void initiateRecovery(SafeKeeperId safeKeeperId) {
+        RecoveryEvent recoveryEvent = new RecoveryEvent(safeKeeperId);
+        loopbackDispatcher.dispatchEvent(safeKeeperId.getPrototypeId() + "_recovery", recoveryEvent);
+    }
+
+    public void setSerializer(SerializerDeserializer serializer) {
+        this.serializer = serializer;
+    }
+
+    public SerializerDeserializer getSerializer() {
+        return serializer;
+    }
+
+    public int getPartitionId() {
+        return ((CommLayerEmitter) loopbackDispatcher.getEventEmitter()).getListener().getId();
+    }
+
+    public void setHasher(Hasher hasher) {
+        this.hasher = hasher;
+        signalReady.countDown();
+    }
+
+    public Hasher getHasher() {
+        return hasher;
+    }
+
+    public void setStateStorage(StateStorage stateStorage) {
+        this.stateStorage = stateStorage;
+    }
+
+    public StateStorage getStateStorage() {
+        return stateStorage;
+    }
+
+    public void setLoopbackDispatcher(Dispatcher dispatcher) {
+        this.loopbackDispatcher = dispatcher;
+        signalReady.countDown();
+    }
+
+    public Dispatcher getLoopbackDispatcher() {
+        return this.loopbackDispatcher;
+    }
+
+    public CountDownLatch getReadySignal() {
+        return signalReady;
+    }
+
+    public StorageCallbackFactory getStorageCallbackFactory() {
+        return storageCallbackFactory;
+    }
+
+    public void setStorageCallbackFactory(StorageCallbackFactory storageCallbackFactory) {
+        this.storageCallbackFactory = storageCallbackFactory;
+    }
+
+    public int getMaxWriteThreads() {
+        return maxWriteThreads;
+    }
+
+    public void setMaxWriteThreads(int maxWriteThreads) {
+        this.maxWriteThreads = maxWriteThreads;
+    }
+
+    public int getWriteThreadKeepAliveSeconds() {
+        return writeThreadKeepAliveSeconds;
+    }
+
+    public void setWriteThreadKeepAliveSeconds(int writeThreadKeepAliveSeconds) {
+        this.writeThreadKeepAliveSeconds = writeThreadKeepAliveSeconds;
+    }
+
+    public int getMaxOutstandingWriteRequests() {
+        return maxOutstandingWriteRequests;
+    }
+
+    public void setMaxOutstandingWriteRequests(int maxOutstandingWriteRequests) {
+        this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/SafeKeeperId.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SafeKeeperId.java b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeperId.java
new file mode 100644
index 0000000..0095e7e
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeperId.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.ft;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * <p>
+ * Identifier of PEs. It is used to identify checkpointed PEs in the storage
+ * backend.
+ * </p>
+ * <p>
+ * The storage backend is responsible for converting this identifier to whatever
+ * internal representation is most suitable for it.
+ * </p>
+ * <p>
+ * This class provides methods for getting a compact String representation of
+ * the identifier and for creating an identifier from a String representation.
+ * </p>
+ * 
+ */
+public class SafeKeeperId {
+
+    private String prototypeId;
+    private String keyed;
+
+    private static final Pattern STRING_REPRESENTATION_PATTERN = Pattern
+            .compile("\\[(\\S*)\\];\\[(\\S*)\\]");
+
+    public SafeKeeperId() {
+    }
+
+    /**
+     * 
+     * @param prototypeID
+     *            id of the PE as returned by {@link ProcessingElement#getId()
+     *            getId()} method
+     * @param keyed
+     *            keyed attribute(s)
+     */
+    public SafeKeeperId(String prototypeID, String keyed) {
+        super();
+        this.prototypeId = prototypeID;
+        this.keyed = keyed;
+    }
+
+    public SafeKeeperId(String keyAsString) {
+        Matcher matcher = STRING_REPRESENTATION_PATTERN.matcher(keyAsString);
+
+        try {
+            matcher.find();
+            prototypeId = "".equals(matcher.group(1)) ? null : matcher.group(1);
+            keyed = "".equals(matcher.group(2)) ? null : matcher.group(2);
+        } catch (IndexOutOfBoundsException e) {
+
+        }
+
+    }
+
+    public String getKey() {
+        return keyed;
+    }
+
+    public String getPrototypeId() {
+        return prototypeId;
+    }
+
+    public String toString() {
+        return "[PROTO_ID];[KEYED] --> " + getStringRepresentation();
+    }
+
+    public String getStringRepresentation() {
+        return "[" + (prototypeId == null ? "" : prototypeId) + "];["
+                + (keyed == null ? "" : keyed) + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        return getStringRepresentation().hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if ((obj == null) || (getClass() != obj.getClass())) {
+            return false;
+        }
+
+        SafeKeeperId other = (SafeKeeperId) obj;
+        if (keyed == null) {
+            if (other.keyed != null)
+                return false;
+        } else if (!keyed.equals(other.keyed))
+            return false;
+        if (prototypeId == null) {
+            if (other.prototypeId != null)
+                return false;
+        } else if (!prototypeId.equals(other.prototypeId))
+            return false;
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java b/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
new file mode 100644
index 0000000..79a881d
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.ft;
+
+
+/**
+ * 
+ * Encapsulates a checkpoint request. It is scheduled by the checkpointing framework.
+ *
+ */
+public class SaveStateTask implements Runnable {
+    
+    SafeKeeperId safeKeeperId;
+    byte[] state;
+    StorageCallback storageCallback;
+    StateStorage stateStorage;
+    
+    public SaveStateTask(SafeKeeperId safeKeeperId, byte[] state, StorageCallback storageCallback, StateStorage stateStorage) {
+        super();
+        this.safeKeeperId = safeKeeperId;
+        this.state = state;
+        this.storageCallback = storageCallback;
+        this.stateStorage = stateStorage;
+    }
+    
+    @Override
+    public void run() {
+        stateStorage.saveState(safeKeeperId, state, storageCallback);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/StateStorage.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/StateStorage.java b/s4-core/src/main/java/org/apache/s4/ft/StateStorage.java
new file mode 100644
index 0000000..6df9f18
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/StateStorage.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.ft;
+
+import java.util.Set;
+
+/**
+ * <p>
+ * Defines the methods that must be implemented by backend storage for
+ * checkpoints.
+ * </p>
+ * 
+ */
+public interface StateStorage {
+
+    /**
+     * Stores a checkpoint.
+     * 
+     * <p>
+     * NOTE: we don't handle any failure/success return value, because all
+     * failure/success notifications go through the StorageCallback reference
+     * </p>
+     * @param key
+     *            safeKeeperId
+     * @param state
+     *            checkpoint data as a byte array
+     * @param callback
+     *            callback for receiving notifications of storage operations.
+     *            This callback is configurable
+     */
+    public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback);
+
+    /**
+     * Fetches data for a stored checkpoint.
+     * <p>
+     * Must return null if storage does not contain this key.
+     * </p>
+     * 
+     * @param key
+     *            safeKeeperId for this checkpoint
+     * 
+     * @return stored checkpoint data, or null if the storage does not contain
+     *         data for the given key
+     */
+    public byte[] fetchState(SafeKeeperId key);
+
+    /**
+     * Fetches all stored safeKeeper Ids.
+     * 
+     * @return all stored safeKeeper Ids.
+     */
+    public Set<SafeKeeperId> fetchStoredKeys();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/StorageCallback.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/StorageCallback.java b/s4-core/src/main/java/org/apache/s4/ft/StorageCallback.java
new file mode 100644
index 0000000..605597f
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/StorageCallback.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.ft;
+
+/**
+ * 
+ * Callback for reporting the result of an asynchronous storage operation
+ *
+ */
+public interface StorageCallback {
+	
+    /**
+     * Notifies the result of a storage operation
+     * 
+     * @param resultCode code for the result : {@link SafeKeeper.StorageResultCode SafeKeeper.StorageResultCode}
+     * @param message whatever message object is suitable
+     */
+	public void storageOperationResult(SafeKeeper.StorageResultCode resultCode,
+			Object message);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/StorageCallbackFactory.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/StorageCallbackFactory.java b/s4-core/src/main/java/org/apache/s4/ft/StorageCallbackFactory.java
new file mode 100644
index 0000000..a402ac9
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/StorageCallbackFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.ft;
+
+/**
+ * A factory for creating storage callbacks. Storage callback implementations
+ * that can take specific actions upon success or failure of asynchronous
+ * storage operations.
+ * 
+ */
+public interface StorageCallbackFactory {
+
+    /**
+     * Factory method
+     * 
+     * @return returns a StorageCallback instance
+     */
+    public StorageCallback createStorageCallback();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/package.html
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/package.html b/s4-core/src/main/java/org/apache/s4/ft/package.html
new file mode 100644
index 0000000..37fe51d
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/package.html
@@ -0,0 +1,23 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
+<html>
+<head>
+</head>
+<body bgcolor="white">
+	<p>This package contains classes for providing some fault tolerance
+		to S4 PEs.</p>
+	<p>The current approach is based on checkpointing.</p>
+	<p>Checkpoints are taken periodically (configurable by time or
+		frequency of application events), and when restarting an S4 node,
+		saved checkpoints are automatically and lazily restored.</p>
+	<p>Lazy restoration is triggered by an application event to a PE
+		that has not yet been restored.</p>
+	<p>Checkpoints are stored in storage backends. Storage backends may
+		implement eager techniques to prefetch checkpoint data to be
+		recovered.
+	<p>
+		The application programmer must take care of marking as <b>transient</b>
+		the fields that do not have to be persisted (or cannot be persisted).
+	<p>Storage backends are pluggable and we provide some default
+		implementations in this package</p>
+</body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/listener/CommLayerListener.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/listener/CommLayerListener.java b/s4-core/src/main/java/org/apache/s4/listener/CommLayerListener.java
new file mode 100644
index 0000000..a67153c
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/listener/CommLayerListener.java
@@ -0,0 +1,279 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.listener;
+
+import static org.apache.s4.util.MetricsName.S4_CORE_METRICS;
+import static org.apache.s4.util.MetricsName.low_level_listener_badmsg_ct;
+import static org.apache.s4.util.MetricsName.low_level_listener_msg_drop_ct;
+import static org.apache.s4.util.MetricsName.low_level_listener_msg_in_ct;
+import static org.apache.s4.util.MetricsName.low_level_listener_qsz;
+import static org.apache.s4.util.MetricsName.s4_core_exit_ct;
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.comm.core.CommEventCallback;
+import org.apache.s4.comm.core.CommLayerState;
+import org.apache.s4.comm.core.Deserializer;
+import org.apache.s4.comm.core.ListenerProcess;
+import org.apache.s4.logger.Monitor;
+import org.apache.s4.serialize.SerializerDeserializer;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+public class CommLayerListener implements EventListener, Runnable {
+    private static Logger logger = Logger.getLogger(CommLayerListener.class);
+    private int dequeuerCount = 12;
+    private Set<EventHandler> handlers = new HashSet<EventHandler>();
+    ListenerProcess process;
+    private BlockingQueue<Object> messageQueue;
+    private int maxQueueSize = 1000;
+    private String clusterManagerAddress;
+    private String appName;
+    private Object listenerConfig;
+    private Monitor monitor;
+    private int partitionId = -1;
+    private int zkConnected = 1;
+    private SerializerDeserializer serDeser;
+
+    public void setSerDeser(SerializerDeserializer serDeser) {
+        this.serDeser = serDeser;
+    }
+
+    public void setMonitor(Monitor monitor) {
+        this.monitor = monitor;
+        monitor.setDefaultValue("tid", partitionId);
+    }
+
+    public void setMaxQueueSize(int maxQueueSize) {
+        this.maxQueueSize = maxQueueSize;
+    }
+
+    @Override
+    public int getId() {
+        return partitionId;
+    }
+
+    @Override
+    public String getAppName() {
+        return appName;
+    }
+
+    public void setAppName(String appName) {
+        this.appName = appName;
+    }
+
+    public String getClusterManagerAddress() {
+        return clusterManagerAddress;
+    }
+
+    public void setClusterManagerAddress(String clusterManagerAddress) {
+        this.clusterManagerAddress = clusterManagerAddress;
+    }
+
+    @Override
+    public void addHandler(EventHandler handler) {
+        handlers.add(handler);
+    }
+
+    @Override
+    public boolean removeHandler(EventHandler handler) {
+        return handlers.remove(handler);
+    }
+
+    public Object getListenerConfig() {
+        return this.listenerConfig;
+    }
+
+    public void init() {
+        System.err.println("appName=" + appName);
+        process = new ListenerProcess(clusterManagerAddress, appName);
+        process.setDeserializer(new PassThroughDeserializer());
+        CommEventCallback callbackHandler = new CommEventCallback() {
+            @Override
+            public void handleCallback(Map<String, Object> event) {
+                if (event != null) {
+                    CommLayerState state = (CommLayerState) event.get("state");
+                    if (state != null) {
+                        if (state == CommLayerState.INITIALIZED) {
+                            logger.info("Communication layer initialized: source:"
+                                    + event.get("source"));
+                        } else if (state == CommLayerState.BROKEN) {
+                            logger.error("Communication layer broken: source:"
+                                    + event.get("source"));
+                            logger.error("System exiting so that process can restart.");
+                            if (monitor != null) {
+                                monitor.set(s4_core_exit_ct.toString(),
+                                            1,
+                                            S4_CORE_METRICS.toString());
+                            }
+                            // should flush stats before exiting
+                            monitor.flushStats();
+                            try {
+                                Thread.sleep(1000);
+                            } catch (InterruptedException e) {
+                            }
+                            System.exit(3);
+                        }
+                    }
+                }
+            }
+        };
+        process.setCallbackHandler(callbackHandler);
+
+        messageQueue = new LinkedBlockingQueue<Object>(maxQueueSize);
+
+        // listenerConfig = process.acquireTaskAndCreateListener(map);
+        Thread t = new Thread(this);
+        t.setPriority(Thread.MAX_PRIORITY);
+        t.start();
+
+        if (System.getProperty("DequeuerCount") != null) {
+            dequeuerCount = Integer.parseInt(System.getProperty("DequeuerCount"));
+        }
+
+        System.out.println("dequeuer number: " + dequeuerCount);
+
+        for (int i = 0; i < dequeuerCount; i++) {
+            t = new Thread(new Dequeuer(this, i));
+            // t.setPriority(Thread.MIN_PRIORITY);
+            t.start();
+        }
+    }
+
+    // This is the actual raw listener, which simply listens for messages on the
+    // socket
+    public void run() {
+        boolean isAddMessageSucceeded = false;
+        // acquire a task to do
+        synchronized (this) {
+            Map<String, String> map = new HashMap<String, String>();
+            try {
+                map.put("ListenerId", InetAddress.getLocalHost().getHostName()
+                        + "_" + System.getProperty("pid") + "_"
+                        + Thread.currentThread().getId());
+                map.put("address", InetAddress.getLocalHost().getHostAddress());
+            } catch (UnknownHostException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e);
+            }
+            logger.info("Waiting to acquire task");
+            listenerConfig = process.acquireTaskAndCreateListener(map);
+            logger.info("acquired task with config:" + listenerConfig);
+            Map<String, String> configMap = (Map<String, String>) listenerConfig;
+            String partition = configMap.get("partition");
+            if (partition != null) {
+                partitionId = Integer.parseInt(partition);
+                monitor.setDefaultValue("tid", partitionId);
+                logger.info("tid is set to " + partitionId);
+            }
+            this.notify();
+        }
+        while (!Thread.interrupted()) {
+            byte[] message = (byte[]) process.listen();
+
+            try {
+                isAddMessageSucceeded = messageQueue.offer(message);
+                if (monitor != null) {
+                    monitor.set(low_level_listener_qsz.toString(),
+                                messageQueue.size(),
+                                S4_CORE_METRICS.toString());
+                    if (isAddMessageSucceeded) {
+                        monitor.increment(low_level_listener_msg_in_ct.toString(),
+                                          1,
+                                          S4_CORE_METRICS.toString());
+                    } else {
+                        monitor.increment(low_level_listener_msg_drop_ct.toString(),
+                                          1,
+                                          S4_CORE_METRICS.toString());
+                    }
+                }
+            } catch (Exception e) {
+                Logger.getLogger("s4")
+                      .error("Exception in monitor metrics on thread "
+                                     + Thread.currentThread().getId(),
+                             e);
+            }
+        }
+    }
+
+    public Object takeMessage() throws InterruptedException {
+        return messageQueue.take();
+    }
+
+    class Dequeuer implements Runnable {
+        private int id;
+        private CommLayerListener rawListener;
+
+        public Dequeuer(CommLayerListener rawListener, int id) {
+            this.id = id;
+            this.rawListener = rawListener;
+        }
+
+        public void run() {
+            while (!Thread.interrupted()) {
+                try {
+                    byte[] rawMessage = (byte[]) rawListener.takeMessage();
+                    processMessage(rawMessage);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        }
+
+        public void processMessage(byte[] rawMessage) {
+            // convert the byte array into an event object
+            EventWrapper eventWrapper = null;
+            try {
+                eventWrapper = (EventWrapper) serDeser.deserialize(rawMessage);
+
+            } catch (RuntimeException rte) {
+                Logger.getLogger("s4")
+                      .error("Error converting message to an event: ", rte);
+                if (monitor != null) {
+                    monitor.increment(low_level_listener_badmsg_ct.toString(),
+                                      1,
+                                      S4_CORE_METRICS.toString());
+                }
+                return;
+            }
+
+            if (eventWrapper != null) {
+                for (EventHandler handler : handlers) {
+                    try {
+                        handler.processEvent(eventWrapper);
+                    } catch (Exception e) {
+                        Logger.getLogger("s4")
+                              .error("Error calling processEvent on handler", e);
+                    }
+                }
+            }
+        }
+
+    }
+
+    public class PassThroughDeserializer implements Deserializer {
+        public Object deserialize(byte[] input) {
+            return input;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/listener/EventHandler.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/listener/EventHandler.java b/s4-core/src/main/java/org/apache/s4/listener/EventHandler.java
new file mode 100644
index 0000000..b6ad884
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/listener/EventHandler.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.listener;
+
+import org.apache.s4.collector.EventWrapper;
+
+public interface EventHandler {
+    void processEvent(EventWrapper eventWrapper);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/listener/EventListener.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/listener/EventListener.java b/s4-core/src/main/java/org/apache/s4/listener/EventListener.java
new file mode 100644
index 0000000..2e2cd1d
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/listener/EventListener.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.listener;
+
+public interface EventListener extends EventProducer {
+
+    int getId();
+
+    String getAppName();
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/listener/EventProducer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/listener/EventProducer.java b/s4-core/src/main/java/org/apache/s4/listener/EventProducer.java
new file mode 100644
index 0000000..2065332
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/listener/EventProducer.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.listener;
+
+public interface EventProducer {
+
+    void addHandler(EventHandler handler);
+
+    boolean removeHandler(EventHandler handler);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/logger/Log4jMonitor.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/logger/Log4jMonitor.java b/s4-core/src/main/java/org/apache/s4/logger/Log4jMonitor.java
new file mode 100644
index 0000000..d9008fe
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/logger/Log4jMonitor.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class Log4jMonitor extends TimerTask implements Monitor {
+    Map<String, Integer> metricMap = new ConcurrentHashMap<String, Integer>();
+    private String loggerName = "s4";
+    private int flushInterval = 600; // default is every 10 minutes
+
+    private Timer timer = new Timer();
+    private Map<String, Integer> defaultMap = new HashMap<String, Integer>();
+
+    public void setLoggerName(String loggerName) {
+        this.loggerName = loggerName;
+    }
+
+    public void setFlushInterval(int flushInterval) {
+        this.flushInterval = flushInterval;
+    }
+
+    public void init() {
+        if (flushInterval > 0) {
+            timer.scheduleAtFixedRate(this,
+                                      flushInterval * 1000,
+                                      flushInterval * 1000);
+        }
+    }
+
+    // TODO: this will be removed after changing above functions
+    public void set(String metricName, int value) {
+        metricMap.put(metricName, value);
+    }
+
+    public void flushStats() {
+        org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(loggerName);
+        for (String key : metricMap.keySet()) {
+            String message = key + " = " + metricMap.get(key);
+            logger.info(message);
+            metricMap.remove(key);
+        }
+        if (defaultMap != null) {
+            for (String key : defaultMap.keySet()) {
+                // TODO: need to be changed
+                set(key, defaultMap.get(key));
+            }
+        }
+    }
+
+    public void run() {
+        flushStats();
+    }
+
+    @Override
+    public void increment(String metricName, int increment) {
+        Integer currValue = metricMap.get(metricName);
+        if (currValue == null) {
+            currValue = 0;
+        }
+        currValue += increment;
+        metricMap.put(metricName, currValue);
+    }
+
+    @Override
+    public void setDefaultValue(String key, int val) {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void increment(String metricName, int increment, String metricEventName, String... furtherDistinctions) {
+        increment(buildMetricName(metricName,
+                                  metricEventName,
+                                  furtherDistinctions),
+                  increment);
+
+    }
+
+    @Override
+    public void set(String metricName, int value, String metricEventName, String... furtherDistinctions) {
+        metricMap.put(buildMetricName(metricName,
+                                      metricEventName,
+                                      furtherDistinctions),
+                      value);
+    }
+
+    private String buildMetricName(String metricName, String metricEventName, String[] furtherDistinctions) {
+        StringBuffer sb = new StringBuffer(metricEventName);
+        sb.append(":");
+        sb.append(metricName);
+        if (furtherDistinctions != null) {
+            for (String furtherDistinction : furtherDistinctions) {
+                sb.append(":");
+                sb.append(furtherDistinction);
+            }
+        }
+        return sb.toString().intern();
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/logger/Monitor.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/logger/Monitor.java b/s4-core/src/main/java/org/apache/s4/logger/Monitor.java
new file mode 100644
index 0000000..1da30ad
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/logger/Monitor.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.logger;
+
+public interface Monitor {
+    public void increment(String metricName, int increment, String metricEventName, String... aggKeys);
+
+    public void increment(String metricName, int increment);
+
+    public void set(String metricName, int value, String metricEventName, String... aggKeys);
+
+    public void set(String metricName, int value);
+
+    public void flushStats();
+
+    public void setDefaultValue(String key, int val);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/logger/TraceMessage.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/logger/TraceMessage.java b/s4-core/src/main/java/org/apache/s4/logger/TraceMessage.java
new file mode 100644
index 0000000..71ab28f
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/logger/TraceMessage.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.logger;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TraceMessage {
+    private long traceId;
+    private Map<String, String> propertyMap = new HashMap<String, String>();
+
+    public void setTraceId(long traceId) {
+        this.traceId = traceId;
+    }
+
+    public long getTraceId() {
+        return this.traceId;
+    }
+
+    public void setProperty(String name, String value) {
+        this.propertyMap.put(name, value);
+    }
+
+    public String toString() {
+        return this.traceId + "; " + this.propertyMap.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/message/PrototypeRequest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/message/PrototypeRequest.java b/s4-core/src/main/java/org/apache/s4/message/PrototypeRequest.java
new file mode 100644
index 0000000..20744e7
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/message/PrototypeRequest.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.message;
+
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.dispatcher.partitioner.Hasher;
+import org.apache.s4.processor.PrototypeWrapper;
+import org.apache.s4.util.MethodInvoker;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * A request for a value from the prototype of PEs.
+ */
+public class PrototypeRequest extends Request {
+
+    private final List<String> query;
+
+    public PrototypeRequest(List<String> query, RInfo info) {
+        this.query = query;
+        this.rinfo = info;
+    }
+
+    public PrototypeRequest(List<String> query) {
+        this.query = query;
+        this.rinfo = null;
+    }
+
+    public PrototypeRequest() {
+        this.query = Collections.<String> emptyList();
+        this.rinfo = null;
+    }
+
+    public String toString() {
+        return "PROTOTYPE: query=[" + query + "] info=[" + rinfo + "]";
+    }
+
+    /**
+     * List of queries to execute.
+     * 
+     * @return list of queries
+     */
+    public List<String> getQuery() {
+        return query;
+    }
+
+    /**
+     * Evaluate Request on a particular PE Prototype.
+     * 
+     * @param pw
+     *            prototype
+     * @return Response object.
+     */
+    public Response evaluate(PrototypeWrapper pw) {
+
+        HashMap<String, Object> results = new HashMap<String, Object>();
+        HashMap<String, String> exceptions = new HashMap<String, String>();
+
+        for (String q : query) {
+            if (q.startsWith("$")) {
+                // requests for getters should be of the form $fieldA.
+                // Responds with pe.getFieldA()
+                try {
+                    Object res = MethodInvoker.invokeGetter(pw, q.substring(1));
+                    results.put(q, res);
+
+                } catch (Exception e) {
+                    exceptions.put(q, e.toString());
+                }
+
+            } else if (q.equalsIgnoreCase("count")) {
+                // Some aggregate operators
+                results.put(q, pw.getPECount());
+
+            } else {
+                exceptions.put(q, "Query Parse Error");
+            }
+        }
+
+        return new Response(results, exceptions, this);
+    }
+
+    public List<CompoundKeyInfo> partition(Hasher h, String delim, int partCount) {
+        // send to all partitions
+        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+
+        for (int i = 0; i < partCount; ++i) {
+            CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+            partitionInfo.setPartitionId(i);
+            partitionInfoList.add(partitionInfo);
+        }
+
+        return partitionInfoList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/message/Request.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/message/Request.java b/s4-core/src/main/java/org/apache/s4/message/Request.java
new file mode 100644
index 0000000..c6cae68
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/message/Request.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.message;
+
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.dispatcher.partitioner.Hasher;
+import org.apache.s4.util.GsonUtil;
+
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.gson.InstanceCreator;
+
+abstract public class Request {
+
+    protected RInfo rinfo = nullRInfo;
+
+    public final static RInfo nullRInfo = new NullRInfo();
+
+    /**
+     * Requester/Return information
+     */
+    abstract public static class RInfo {
+
+        private long id = 0;
+
+        /**
+         * Identity of request. This is typically specified by the requester.
+         */
+        public long getId() {
+            return id;
+        }
+
+        public void setId(int id) {
+            this.id = id;
+        }
+
+        private String stream;
+
+        /**
+         * Stream name on which response should be sent.
+         * 
+         * @return stream name.
+         */
+        public String getStream() {
+            return stream;
+        }
+
+        public void setStream(String stream) {
+            this.stream = stream;
+        }
+
+        private int partition;
+
+        /**
+         * Partition Id from which this request originated. This may be used to
+         * return a response to the same partition.
+         * 
+         * @return partition id
+         */
+        public int getPartition() {
+            return partition;
+        }
+
+        public void setPartition(int partition) {
+            this.partition = partition;
+        }
+
+        // Tell Gson how to instantiate one of these: create a ClientRInfo
+        static {
+            InstanceCreator<RInfo> creator = new InstanceCreator<RInfo>() {
+                public org.apache.s4.message.Request.RInfo createInstance(Type type) {
+                    return new org.apache.s4.message.Request.ClientRInfo();
+                }
+            };
+
+            GsonUtil.registerTypeAdapter(RInfo.class, creator);
+        }
+        
+    }
+
+    public static class ClientRInfo extends RInfo {
+        private UUID requesterUUID = null;
+
+        /**
+         * Identity of requesting client. This is used to send the response back
+         * to the client.
+         * 
+         * @return UUID of the client from which the request originated.
+         */
+        public UUID getRequesterUUID() {
+            return requesterUUID;
+        }
+
+        public void setRequesterUUID(UUID requesterUUID) {
+            this.requesterUUID = requesterUUID;
+        }
+
+        public String toString() {
+            return "(id:" + getId() + " requester:" + getRequesterUUID()
+                    + " partition:" + getPartition() + " stream:" + getStream()
+                    + ")";
+        }
+    }
+
+    public static class PERInfo extends RInfo {
+        private String requesterKey = null;
+
+        /**
+         * Identity of requesting PE. This is used to route the response back to
+         * the originating PE.
+         * 
+         * @return key value of the PE from which the request originated.
+         */
+        public String getRequesterKey() {
+            return requesterKey;
+        }
+
+        public void setRequesterKey(String requesterKey) {
+            this.requesterKey = requesterKey;
+        }
+
+        public String toString() {
+            return "(id:" + getId() + " requester:" + getRequesterKey()
+                    + " partition:" + getPartition() + " stream:" + getStream()
+                    + ")";
+        }
+    }
+
+    public static class NullRInfo extends RInfo {
+        public NullRInfo() {
+            super.stream = "@null";
+            super.partition = -1;
+        }
+    }
+
+    /**
+     * Query metainformation.
+     * 
+     * @return Info representing origin of request.
+     */
+    public RInfo getRInfo() {
+        return rinfo;
+    }
+
+    /**
+     * Query metainformation.
+     */
+    public void setRInfo(RInfo rinfo) {
+        this.rinfo = rinfo;
+    }
+
+    /**
+     * Partition itself. This is used by the default partitioner.
+     * 
+     * @param h
+     *            hasher
+     * @param delim
+     *            delimiter used to concatenate compound key values
+     * @param partCount
+     *            number of partitions
+     * @return list of compound keys: one event may have to be sent to multiple
+     *         nodes.
+     */
+    abstract public List<CompoundKeyInfo> partition(Hasher h, String delim,
+                                                    int partCount);
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/message/Response.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/message/Response.java b/s4-core/src/main/java/org/apache/s4/message/Response.java
new file mode 100644
index 0000000..cf01d95
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/message/Response.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.message;
+
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class Response {
+
+    private Map<String, Object> result;
+
+    private Map<String, String> exception;
+
+    private Request request;
+
+    public Response(Map<String, Object> result, Request request) {
+        this.result = result;
+        this.request = request;
+    }
+
+    public Response(Map<String, Object> result, Map<String, String> exception,
+            Request request) {
+        this.result = result;
+        this.exception = exception;
+        this.request = request;
+    }
+
+    public Response() {
+        result = null;
+        exception = null;
+        request = null;
+    }
+
+    /**
+     * Result of a request.
+     * 
+     * @return map from query strings o corresponding values.
+     */
+    public Map<String, Object> getResult() {
+        return result;
+    }
+
+    public Map<String, String> getException() {
+        return exception;
+    }
+
+    public Request getRequest() {
+        return request;
+    }
+
+    public Request.RInfo getRInfo() {
+        return (request != null ? request.getRInfo() : null);
+    }
+
+    public String toString() {
+        return "[" + result + "] (" + request + ")";
+    }
+
+    public List<CompoundKeyInfo> partition(int partCount) {
+        // partition id is available from the request info object
+
+        int p = this.getRInfo().getPartition();
+        List<CompoundKeyInfo> partitionInfoList = null;
+
+        if (p >= 0 && p < partCount) {
+            CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+            partitionInfo.setPartitionId(p);
+
+            partitionInfoList = new ArrayList<CompoundKeyInfo>();
+            partitionInfoList.add(partitionInfo);
+        }
+
+        return partitionInfoList;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/message/SinglePERequest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/message/SinglePERequest.java b/s4-core/src/main/java/org/apache/s4/message/SinglePERequest.java
new file mode 100644
index 0000000..b2c1551
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/message/SinglePERequest.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.message;
+
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.dispatcher.partitioner.Hasher;
+import org.apache.s4.dispatcher.partitioner.KeyInfo;
+import org.apache.s4.processor.AbstractPE;
+import org.apache.s4.util.MethodInvoker;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import org.springframework.util.StringUtils;
+
+/**
+ * A request for a value from a particular PE.
+ */
+public class SinglePERequest extends Request {
+
+    private final List<String> target;
+
+    private final List<String> query;
+
+    public SinglePERequest(List<String> target, List<String> query, RInfo info) {
+        this.target = target;
+        this.query = query;
+        this.rinfo = info;
+    }
+
+    public SinglePERequest(List<String> target, List<String> query) {
+        this.target = target;
+        this.query = query;
+        this.rinfo = null;
+    }
+
+    public SinglePERequest() {
+        this.target = Collections.<String> emptyList();
+        this.query = Collections.<String> emptyList();
+        this.rinfo = null;
+    }
+
+    public String toString() {
+        return "target:" + target + " query:" + query + " info:" + rinfo;
+    }
+
+    /**
+     * Fields used to target a particular PE.
+     * 
+     * @return list of targeting values. Order matters.
+     */
+    public List<String> getTarget() {
+        return target;
+    }
+
+    /**
+     * List of field names that have to be read form target PE.
+     * 
+     * @return list of field name strings.
+     */
+    public List<String> getQuery() {
+        return query;
+    }
+
+    /**
+     * Evaluate Request on a particular PE.
+     * 
+     * @param pe
+     * @return Response object.
+     */
+    public Response evaluate(AbstractPE pe) {
+
+        HashMap<String, Object> results = new HashMap<String, Object>();
+        HashMap<String, String> exceptions = new HashMap<String, String>();
+
+        for (String q : query) {
+            // requests for getters should be of the form $field. Responds with
+            // pe.getField()
+            if (q.startsWith("$")) {
+                try {
+                    Object res = MethodInvoker.invokeGetter(pe, q.substring(1));
+                    results.put(q, res);
+
+                } catch (Exception e) {
+                    exceptions.put(q, e.toString());
+                }
+            }
+        }
+
+        return new Response(results, exceptions, this);
+    }
+
+    public List<CompoundKeyInfo> partition(Hasher h, String delim, int partCount) {
+        List<String> valueList = this.getTarget();
+        if (valueList == null)
+            return null;
+
+        // First, build the key
+        KeyInfo keyInfo = new KeyInfo();
+        // special kay name to denote request
+        keyInfo.addElementToPath("#req");
+
+        // for value, concatenate list of values from Request's target field.
+        String stringValue = StringUtils.collectionToDelimitedString(valueList,
+                                                                     delim);
+        keyInfo.setValue(stringValue);
+
+        // partition id is derived form string value, as usual
+        int partitionId = (int) (h.hash(stringValue) % partCount);
+
+        CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+        partitionInfo.addKeyInfo(keyInfo);
+        partitionInfo.setCompoundValue(stringValue);
+        partitionInfo.setPartitionId(partitionId);
+
+        List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+        partitionInfoList.add(partitionInfo);
+
+        return partitionInfoList;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java b/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java
new file mode 100644
index 0000000..46dd19b
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.persist;
+
+import org.apache.s4.util.clock.Clock;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+
+public class ConMapPersister implements Persister {
+    private AtomicInteger persistCount = new AtomicInteger(0);
+    private boolean selfClean = false;
+    private int cleanWaitTime = 40; // 20 seconds by default
+    private String loggerName = "s4";
+    ConcurrentHashMap<String, CacheEntry> cache;
+    Clock s4Clock;
+
+    private int startCapacity = 5000;
+
+    public void setStartCapacity(int startCapacity) {
+        this.startCapacity = startCapacity;
+    }
+
+    public int getStartCapacity() {
+        return startCapacity;
+    }
+
+    public void setSelfClean(boolean selfClean) {
+        this.selfClean = selfClean;
+    }
+
+    public void setCleanWaitTime(int cleanWaitTime) {
+        this.cleanWaitTime = cleanWaitTime;
+    }
+
+    public void setLoggerName(String loggerName) {
+        this.loggerName = loggerName;
+    }
+
+    public ConMapPersister(Clock s4Clock) {
+        this.s4Clock = s4Clock;
+    }
+    
+    public void setS4Clock(Clock s4Clock) {
+        this.s4Clock = s4Clock;
+    }
+    
+    public ConMapPersister() {
+    }
+
+    public void init() {
+        cache = new ConcurrentHashMap<String, CacheEntry>(this.getStartCapacity());
+
+        if (selfClean) {
+            Runnable r = new Runnable() {
+                public void run() {
+                    while (!Thread.interrupted()) {
+                        int cleanCount = ConMapPersister.this.cleanOutGarbage();
+                        Logger.getLogger(loggerName).info("Cleaned out "
+                                + cleanCount + " entries; Persister has "
+                                + cache.size() + " entries");
+                        try {
+                            Thread.sleep(cleanWaitTime * 1000);
+                        } catch (InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                        }
+                    }
+                }
+            };
+            Thread t = new Thread(r);
+            t.start();
+            t.setPriority(Thread.MIN_PRIORITY);
+        }
+    }
+
+    public int getQueueSize() {
+        return 0;
+    }
+
+    public int getPersistCount() {
+        return persistCount.get();
+    }
+
+    public int getCacheEntryCount() {
+        return cache.size();
+    }
+
+    public void setAsynch(String key, Object value, int period) {
+        // there really is no asynch for the local cache
+        set(key, value, period);
+    }
+
+    public void set(String key, Object value, int period) {
+        if (value == null) {
+            cache.remove(key);
+            return;
+        }
+        persistCount.getAndIncrement();
+        CacheEntry ce = new CacheEntry();
+        ce.value = value;
+        ce.period = period;
+        ce.addTime = s4Clock.getCurrentTime();
+        cache.put(key, ce);
+    }
+
+    public Object get(String key) {
+        CacheEntry ce = cache.get(key);
+        if (ce == null) {
+            return null;
+        }
+
+        if (ce.isExpired()) {
+            return null;
+        }
+
+        return ce.value;
+    }
+
+    public Map<String, Object> getBulk(String[] keys) {
+        HashMap map = new HashMap<String, Object>();
+        for (String key : keys) {
+            Object value = get(key);
+            if (value != null) {
+                map.put(key, value);
+            }
+        }
+        return map;
+    }
+
+    public Object getObject(String key) {
+        return get(key);
+    }
+
+    public Map<String, Object> getBulkObjects(String[] keys) {
+        return getBulk(keys);
+    }
+
+    public void remove(String key) {
+        cache.remove(key);
+    }
+
+    public int cleanOutGarbage() {
+        int count = 0;
+        for (Enumeration en = cache.keys(); en.hasMoreElements();) {
+            String key = (String) en.nextElement();
+            CacheEntry ce = cache.get(key);
+            if (ce != null && ce.isExpired()) {
+                count++;
+                cache.remove(key);
+            }
+        }
+        return count;
+    }
+
+    public Set<String> keySet() {
+        return cache.keySet();
+    }
+
+    public class CacheEntry {
+        Object value;
+        long addTime;
+        int period;
+
+        public boolean isExpired() {
+            if (period > 0) {
+                if ((addTime + (1000 * (long) period)) <= s4Clock.getCurrentTime()) {
+                    return true;
+                }
+            }
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/persist/DumpingPersister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/persist/DumpingPersister.java b/s4-core/src/main/java/org/apache/s4/persist/DumpingPersister.java
new file mode 100644
index 0000000..fca9d60
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/persist/DumpingPersister.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.persist;
+
+import org.apache.s4.processor.OutputFormatter;
+import org.apache.s4.util.clock.Clock;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+
+public class DumpingPersister extends ConMapPersister implements Runnable {
+    
+    public DumpingPersister() {       
+    }
+    
+    public DumpingPersister(Clock s4Clock) {
+        super(s4Clock);
+        // TODO Auto-generated constructor stub
+    }
+
+    private String dumpFilePrefix;
+    private Map<String, OutputFormatter> regexFormatter;
+    private Pattern[] patterns;
+    private OutputFormatter[] formatters;
+    private long outputTimeBoundary;
+
+    public void setDumpFilePrefix(String dumpFilePrefix) {
+        this.dumpFilePrefix = dumpFilePrefix;
+    }
+
+    public void setRegexFormatter(Map<String, OutputFormatter> regexFormatter) {
+        this.regexFormatter = regexFormatter;
+    }
+
+    public void setOutputTimeBoundary(long outputTimeBoundary) {
+        this.outputTimeBoundary = outputTimeBoundary;
+    }
+
+    public void init() {
+        super.init();
+
+        Set<String> regexes = regexFormatter.keySet();
+        patterns = new Pattern[regexes.size()];
+        formatters = new OutputFormatter[regexes.size()];
+
+        int i = 0;
+        for (String regex : regexes) {
+            patterns[i] = Pattern.compile(regex);
+            formatters[i] = regexFormatter.get(regex);
+            i++;
+        }
+
+        Thread t = new Thread(this);
+        t.start();
+    }
+
+    public void run() {
+        long boundaryInMillis = outputTimeBoundary * 1000;
+        long currentTime = System.currentTimeMillis();
+        while (!Thread.interrupted()) {
+            long currentBoundary = (currentTime / boundaryInMillis)
+                    * boundaryInMillis;
+            long interval = ((currentBoundary + boundaryInMillis) - System.currentTimeMillis());
+            if (interval > 0) {
+                try {
+                    Thread.sleep(interval);
+                } catch (InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+
+            currentTime = System.currentTimeMillis();
+
+            try {
+                output();
+            } catch (Exception e) {
+                Logger.getLogger("s4").error("Exception dumping persister", e);
+            }
+        }
+    }
+
+    public void output() {
+        File file = new File(dumpFilePrefix + UUID.randomUUID().toString());
+        Logger.getLogger("s4").info("Dumping to " + file);
+        FileOutputStream fos = null;
+        OutputStreamWriter osw = null;
+        BufferedWriter dumpWriter = null;
+
+        try {
+            fos = new FileOutputStream(file);
+            osw = new OutputStreamWriter(fos);
+            dumpWriter = new BufferedWriter(osw);
+        } catch (IOException ioe) {
+            throw new RuntimeException(ioe);
+        }
+
+        int savedPriority = Thread.currentThread().getPriority();
+        try {
+
+            Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
+            Set<String> keys = new HashSet<String>();
+            for (String key : this.keySet()) {
+                keys.add(key);
+            }
+
+            for (String key : keys) {
+                Object value = this.get(key);
+                if (value == null) {
+                    continue;
+                }
+                for (int patternId = 0; patternId < patterns.length; patternId++) {
+                    Matcher m = patterns[patternId].matcher(key);
+                    if (m.matches()) {
+                        Object formattedValue = formatters[patternId].format(value);
+                        dumpWriter.write(key + " = " + formattedValue + "\n");
+                    }
+                }
+
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            Thread.currentThread().setPriority(savedPriority);
+            try {
+                dumpWriter.close();
+            } catch (Exception e) {
+            }
+            try {
+                osw.close();
+            } catch (Exception e) {
+            }
+            try {
+                fos.close();
+            } catch (Exception e) {
+            }
+        }
+    }
+}