You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:16 UTC
[37/50] [abbrv] Rename packages in preparation for move to Apache
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/InitiateCheckpointingEvent.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/InitiateCheckpointingEvent.java b/s4-core/src/main/java/org/apache/s4/ft/InitiateCheckpointingEvent.java
new file mode 100644
index 0000000..d1a04b4
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/InitiateCheckpointingEvent.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.ft;
+
+/**
+ *
+ * Event that triggers a checkpoint.
+ *
+ */
+public class InitiateCheckpointingEvent extends CheckpointingEvent {
+
+ public InitiateCheckpointingEvent() {
+ // as required by default kryo serializer
+ }
+
+ public InitiateCheckpointingEvent(SafeKeeperId safeKeeperId) {
+ super(safeKeeperId);
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/LoggingStorageCallbackFactory.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/LoggingStorageCallbackFactory.java b/s4-core/src/main/java/org/apache/s4/ft/LoggingStorageCallbackFactory.java
new file mode 100644
index 0000000..38c6e2a
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/LoggingStorageCallbackFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.ft;
+
+import org.apache.s4.ft.SafeKeeper.StorageResultCode;
+
+import org.apache.log4j.Logger;
+
+/**
+ * A factory for creating storage callbacks that simply log callback results
+ *
+ *
+ */
+public class LoggingStorageCallbackFactory implements StorageCallbackFactory {
+
+ @Override
+ public StorageCallback createStorageCallback() {
+ return new StorageCallbackLogger();
+ }
+
+ /**
+ * A basic storage callback that simply logs results from storage operations
+ *
+ */
+ static class StorageCallbackLogger implements StorageCallback {
+
+ private static Logger logger = Logger.getLogger("s4-ft");
+
+ @Override
+ public void storageOperationResult(StorageResultCode code, Object message) {
+ if (StorageResultCode.SUCCESS == code) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Callback from storage: " + StorageResultCode.SUCCESS.name() + " : " + message);
+ }
+ } else {
+ logger.warn("Callback from storage: " + StorageResultCode.FAILURE.name() + " : " + message);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/RecoveryEvent.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/RecoveryEvent.java b/s4-core/src/main/java/org/apache/s4/ft/RecoveryEvent.java
new file mode 100644
index 0000000..26a3530
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/RecoveryEvent.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.ft;
+
+/**
+ *
+ * Event that triggers the recovery of a checkpoint.
+ *
+ */
+public class RecoveryEvent extends CheckpointingEvent {
+
+ public RecoveryEvent() {
+ // as required by default kryo serializer
+ }
+
+ public RecoveryEvent(SafeKeeperId safeKeeperId) {
+ super(safeKeeperId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/RedisStateStorage.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/RedisStateStorage.java b/s4-core/src/main/java/org/apache/s4/ft/RedisStateStorage.java
new file mode 100644
index 0000000..d2656a6
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/RedisStateStorage.java
@@ -0,0 +1,127 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.ft;
+
+import org.apache.s4.ft.SafeKeeper.StorageResultCode;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+
+/**
+ * <p>
+ * This class implements a storage backend based on Redis. Redis is a key-value
+ * store.
+ * </p>
+ * <p>
+ * See {@link http://redis.io/} for more information.
+ * </p>
+ * <p>
+ * Redis must be running as an external service. References to this external
+ * services must be injected during the initialization of the S4 platform.
+ * </p>
+ *
+ *
+ */
+public class RedisStateStorage implements StateStorage {
+
+ static Logger logger = Logger.getLogger("s4-ft");
+ private JedisPool jedisPool;
+ private String redisHost;
+ private int redisPort;
+
+ public void clear() {
+ Jedis jedis = jedisPool.getResource();
+ try {
+ jedis.flushAll();
+ } finally {
+ jedisPool.returnResource(jedis);
+ }
+ }
+
+ public void init() {
+ JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
+ // TODO optional parameterization
+ jedisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort);
+ }
+
+ @Override
+ public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback) {
+ Jedis jedis = jedisPool.getResource();
+ String statusCode = "UNKNOWN";
+ try {
+ statusCode = jedis.set(key.getStringRepresentation().getBytes(), state);
+ } finally {
+ jedisPool.returnResource(jedis);
+ }
+ if ("OK".equals(statusCode)) {
+ callback.storageOperationResult(StorageResultCode.SUCCESS, "Redis result code is [" + statusCode + "] for key [" + key.toString() +"]");
+ } else {
+ callback.storageOperationResult(StorageResultCode.FAILURE, "Unexpected redis result code : [" + statusCode + "] for key [" + key.toString() +"]");
+ }
+ }
+
+ @Override
+ public byte[] fetchState(SafeKeeperId key) {
+ Jedis jedis = jedisPool.getResource();
+ try {
+ return jedis.get(key.getStringRepresentation().getBytes());
+ } finally {
+ jedisPool.returnResource(jedis);
+ }
+ }
+
+ @Override
+ public Set<SafeKeeperId> fetchStoredKeys() {
+ Jedis jedis = jedisPool.getResource();
+ try {
+ Set<String> keys = jedis.keys("*");
+ Set<SafeKeeperId> result = new HashSet<SafeKeeperId>(keys.size());
+ for (String s : keys)
+ result.add(new SafeKeeperId(s));
+ return result;
+ } finally {
+ jedisPool.returnResource(jedis);
+ }
+
+ }
+
+ public String getRedisHost() {
+ return redisHost;
+ }
+
+ public void setRedisHost(String redisHost) {
+ this.redisHost = redisHost;
+ }
+
+ public int getRedisPort() {
+ return redisPort;
+ }
+
+ public void setRedisPort(int redisPort) {
+ this.redisPort = redisPort;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
new file mode 100644
index 0000000..950649b
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.ft;
+
+import org.apache.s4.dispatcher.Dispatcher;
+import org.apache.s4.dispatcher.partitioner.Hasher;
+import org.apache.s4.emitter.CommLayerEmitter;
+import org.apache.s4.processor.AbstractPE;
+import org.apache.s4.serialize.SerializerDeserializer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+/**
+ *
+ * <p>
+ * This class is responsible for coordinating interactions between the S4 event
+ * processor and the checkpoint storage backend. In particular, it schedules
+ * asynchronous save tasks to be executed on the backend.
+ * </p>
+ *
+ *
+ *
+ */
+public class SafeKeeper {
+
+ public enum StorageResultCode {
+ SUCCESS, FAILURE
+ }
+
+ private static Logger logger = Logger.getLogger("s4-ft");
+ private StateStorage stateStorage;
+ private Dispatcher loopbackDispatcher;
+ private SerializerDeserializer serializer;
+ private Hasher hasher;
+ // monitor field injection through a latch
+ private CountDownLatch signalReady = new CountDownLatch(2);
+ private CountDownLatch signalNodesAvailable = new CountDownLatch(1);
+ private StorageCallbackFactory storageCallbackFactory = new LoggingStorageCallbackFactory();
+
+ ThreadPoolExecutor threadPool;
+
+ int maxWriteThreads = 1;
+ int writeThreadKeepAliveSeconds = 120;
+ int maxOutstandingWriteRequests = 1000;
+
+ public SafeKeeper() {
+ }
+
+ /**
+ * <p>
+ * This init() method <b>must</b> be called by the dependency injection
+ * framework. It waits until all required dependencies are injected in
+ * SafeKeeper, and until the node count is accessible from the communication
+ * layer.
+ * </p>
+ */
+ public void init() {
+ try {
+ getReadySignal().await();
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ threadPool = new ThreadPoolExecutor(1, maxWriteThreads, writeThreadKeepAliveSeconds, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<Runnable>(maxOutstandingWriteRequests));
+ logger.debug("Started thread pool with maxWriteThreads=[" + maxWriteThreads
+ + "], writeThreadKeepAliveSeconds=[" + writeThreadKeepAliveSeconds + "], maxOutsandingWriteRequests=["
+ + maxOutstandingWriteRequests + "]");
+
+ int nodeCount = getLoopbackDispatcher().getEventEmitter().getNodeCount();
+ // required wait until nodes are available
+ while (nodeCount == 0) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ignored) {
+ }
+ nodeCount = getLoopbackDispatcher().getEventEmitter().getNodeCount();
+ }
+
+ signalNodesAvailable.countDown();
+ }
+
+ /**
+ * Forwards a call to checkpoint a PE to the backend storage.
+ *
+ * @param key
+ * safeKeeperId
+ * @param state
+ * checkpoint data
+ */
+ public void saveState(SafeKeeperId safeKeeperId, byte[] serializedState) {
+ StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
+ try {
+ threadPool.submit(createSaveStateTask(safeKeeperId, serializedState));
+ } catch (RejectedExecutionException e) {
+ storageCallback.storageOperationResult(StorageResultCode.FAILURE,
+ "Could not submit task to persist checkpoint. Remaining capacity for task queue is ["
+ + threadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+ + threadPool.getQueue().size() + "] ; maximum capacity is [" + maxOutstandingWriteRequests
+ + "]");
+ }
+ }
+
+ private SaveStateTask createSaveStateTask(SafeKeeperId safeKeeperId, byte[] serializedState) {
+ return new SaveStateTask(safeKeeperId, serializedState, storageCallbackFactory.createStorageCallback(),
+ stateStorage);
+ }
+
+ /**
+ * Fetches checkpoint data from storage for a given PE
+ *
+ * @param key
+ * safeKeeperId
+ * @return checkpoint data
+ */
+ public byte[] fetchSerializedState(SafeKeeperId key) {
+
+ try {
+ signalNodesAvailable.await();
+ } catch (InterruptedException ignored) {
+ }
+ byte[] result = null;
+ result = stateStorage.fetchState(key);
+ return result;
+ }
+
+ /**
+ * Generates a checkpoint event for a given PE, and enqueues it in the local
+ * event queue.
+ *
+ * @param pe
+ * reference to a PE
+ */
+ public void generateCheckpoint(AbstractPE pe) {
+ InitiateCheckpointingEvent initiateCheckpointingEvent = new InitiateCheckpointingEvent(pe.getSafeKeeperId());
+
+ List<List<String>> compoundKeyNames;
+ if (pe.getKeyValueString() == null) {
+ logger.warn("Only keyed PEs can be checkpointed. Current PE [" + pe.getSafeKeeperId()
+ + "] will not be checkpointed.");
+ } else {
+ List<String> list = new ArrayList<String>(1);
+ list.add("");
+ compoundKeyNames = new ArrayList<List<String>>(1);
+ compoundKeyNames.add(list);
+ loopbackDispatcher.dispatchEvent(pe.getId() + "_checkpointing", compoundKeyNames,
+ initiateCheckpointingEvent);
+ }
+ }
+
+ /**
+ * Generates a recovery event, and enqueues it in the local event queue.<br/>
+ * This can be used for an eager recovery mechanism.
+ *
+ * @param safeKeeperId
+ * safeKeeperId to recover
+ */
+ public void initiateRecovery(SafeKeeperId safeKeeperId) {
+ RecoveryEvent recoveryEvent = new RecoveryEvent(safeKeeperId);
+ loopbackDispatcher.dispatchEvent(safeKeeperId.getPrototypeId() + "_recovery", recoveryEvent);
+ }
+
+ public void setSerializer(SerializerDeserializer serializer) {
+ this.serializer = serializer;
+ }
+
+ public SerializerDeserializer getSerializer() {
+ return serializer;
+ }
+
+ public int getPartitionId() {
+ return ((CommLayerEmitter) loopbackDispatcher.getEventEmitter()).getListener().getId();
+ }
+
+ public void setHasher(Hasher hasher) {
+ this.hasher = hasher;
+ signalReady.countDown();
+ }
+
+ public Hasher getHasher() {
+ return hasher;
+ }
+
+ public void setStateStorage(StateStorage stateStorage) {
+ this.stateStorage = stateStorage;
+ }
+
+ public StateStorage getStateStorage() {
+ return stateStorage;
+ }
+
+ public void setLoopbackDispatcher(Dispatcher dispatcher) {
+ this.loopbackDispatcher = dispatcher;
+ signalReady.countDown();
+ }
+
+ public Dispatcher getLoopbackDispatcher() {
+ return this.loopbackDispatcher;
+ }
+
+ public CountDownLatch getReadySignal() {
+ return signalReady;
+ }
+
+ public StorageCallbackFactory getStorageCallbackFactory() {
+ return storageCallbackFactory;
+ }
+
+ public void setStorageCallbackFactory(StorageCallbackFactory storageCallbackFactory) {
+ this.storageCallbackFactory = storageCallbackFactory;
+ }
+
+ public int getMaxWriteThreads() {
+ return maxWriteThreads;
+ }
+
+ public void setMaxWriteThreads(int maxWriteThreads) {
+ this.maxWriteThreads = maxWriteThreads;
+ }
+
+ public int getWriteThreadKeepAliveSeconds() {
+ return writeThreadKeepAliveSeconds;
+ }
+
+ public void setWriteThreadKeepAliveSeconds(int writeThreadKeepAliveSeconds) {
+ this.writeThreadKeepAliveSeconds = writeThreadKeepAliveSeconds;
+ }
+
+ public int getMaxOutstandingWriteRequests() {
+ return maxOutstandingWriteRequests;
+ }
+
+ public void setMaxOutstandingWriteRequests(int maxOutstandingWriteRequests) {
+ this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/SafeKeeperId.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SafeKeeperId.java b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeperId.java
new file mode 100644
index 0000000..0095e7e
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeperId.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.ft;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * <p>
+ * Identifier of PEs. It is used to identify checkpointed PEs in the storage
+ * backend.
+ * </p>
+ * <p>
+ * The storage backend is responsible for converting this identifier to whatever
+ * internal representation is most suitable for it.
+ * </p>
+ * <p>
+ * This class provides methods for getting a compact String representation of
+ * the identifier and for creating an identifier from a String representation.
+ * </p>
+ *
+ */
+public class SafeKeeperId {
+
+ private String prototypeId;
+ private String keyed;
+
+ private static final Pattern STRING_REPRESENTATION_PATTERN = Pattern
+ .compile("\\[(\\S*)\\];\\[(\\S*)\\]");
+
+ public SafeKeeperId() {
+ }
+
+ /**
+ *
+ * @param prototypeID
+ * id of the PE as returned by {@link ProcessingElement#getId()
+ * getId()} method
+ * @param keyed
+ * keyed attribute(s)
+ */
+ public SafeKeeperId(String prototypeID, String keyed) {
+ super();
+ this.prototypeId = prototypeID;
+ this.keyed = keyed;
+ }
+
+ public SafeKeeperId(String keyAsString) {
+ Matcher matcher = STRING_REPRESENTATION_PATTERN.matcher(keyAsString);
+
+ try {
+ matcher.find();
+ prototypeId = "".equals(matcher.group(1)) ? null : matcher.group(1);
+ keyed = "".equals(matcher.group(2)) ? null : matcher.group(2);
+ } catch (IndexOutOfBoundsException e) {
+
+ }
+
+ }
+
+ public String getKey() {
+ return keyed;
+ }
+
+ public String getPrototypeId() {
+ return prototypeId;
+ }
+
+ public String toString() {
+ return "[PROTO_ID];[KEYED] --> " + getStringRepresentation();
+ }
+
+ public String getStringRepresentation() {
+ return "[" + (prototypeId == null ? "" : prototypeId) + "];["
+ + (keyed == null ? "" : keyed) + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return getStringRepresentation().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+
+ SafeKeeperId other = (SafeKeeperId) obj;
+ if (keyed == null) {
+ if (other.keyed != null)
+ return false;
+ } else if (!keyed.equals(other.keyed))
+ return false;
+ if (prototypeId == null) {
+ if (other.prototypeId != null)
+ return false;
+ } else if (!prototypeId.equals(other.prototypeId))
+ return false;
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java b/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
new file mode 100644
index 0000000..79a881d
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.ft;
+
+
+/**
+ *
+ * Encapsulates a checkpoint request. It is scheduled by the checkpointing framework.
+ *
+ */
+public class SaveStateTask implements Runnable {
+
+ SafeKeeperId safeKeeperId;
+ byte[] state;
+ StorageCallback storageCallback;
+ StateStorage stateStorage;
+
+ public SaveStateTask(SafeKeeperId safeKeeperId, byte[] state, StorageCallback storageCallback, StateStorage stateStorage) {
+ super();
+ this.safeKeeperId = safeKeeperId;
+ this.state = state;
+ this.storageCallback = storageCallback;
+ this.stateStorage = stateStorage;
+ }
+
+ @Override
+ public void run() {
+ stateStorage.saveState(safeKeeperId, state, storageCallback);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/StateStorage.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/StateStorage.java b/s4-core/src/main/java/org/apache/s4/ft/StateStorage.java
new file mode 100644
index 0000000..6df9f18
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/StateStorage.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.ft;
+
+import java.util.Set;
+
+/**
+ * <p>
+ * Defines the methods that must be implemented by backend storage for
+ * checkpoints.
+ * </p>
+ *
+ */
+public interface StateStorage {
+
+ /**
+ * Stores a checkpoint.
+ *
+ * <p>
+ * NOTE: we don't handle any failure/success return value, because all
+ * failure/success notifications go through the StorageCallback reference
+ * </p>
+ * @param key
+ * safeKeeperId
+ * @param state
+ * checkpoint data as a byte array
+ * @param callback
+ * callback for receiving notifications of storage operations.
+ * This callback is configurable
+ */
+ public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback);
+
+ /**
+ * Fetches data for a stored checkpoint.
+ * <p>
+ * Must return null if storage does not contain this key.
+ * </p>
+ *
+ * @param key
+ * safeKeeperId for this checkpoint
+ *
+ * @return stored checkpoint data, or null if the storage does not contain
+ * data for the given key
+ */
+ public byte[] fetchState(SafeKeeperId key);
+
+ /**
+ * Fetches all stored safeKeeper Ids.
+ *
+ * @return all stored safeKeeper Ids.
+ */
+ public Set<SafeKeeperId> fetchStoredKeys();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/StorageCallback.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/StorageCallback.java b/s4-core/src/main/java/org/apache/s4/ft/StorageCallback.java
new file mode 100644
index 0000000..605597f
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/StorageCallback.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.ft;
+
+/**
+ *
+ * Callback for reporting the result of an asynchronous storage operation
+ *
+ */
+public interface StorageCallback {
+
+ /**
+ * Notifies the result of a storage operation
+ *
+ * @param resultCode code for the result : {@link SafeKeeper.StorageResultCode SafeKeeper.StorageResultCode}
+ * @param message whatever message object is suitable
+ */
+ public void storageOperationResult(SafeKeeper.StorageResultCode resultCode,
+ Object message);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/StorageCallbackFactory.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/StorageCallbackFactory.java b/s4-core/src/main/java/org/apache/s4/ft/StorageCallbackFactory.java
new file mode 100644
index 0000000..a402ac9
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/StorageCallbackFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.ft;
+
+/**
+ * A factory for creating storage callbacks. Storage callback implementations
+ * that can take specific actions upon success or failure of asynchronous
+ * storage operations.
+ *
+ */
+public interface StorageCallbackFactory {
+
+ /**
+ * Factory method
+ *
+ * @return returns a StorageCallback instance
+ */
+ public StorageCallback createStorageCallback();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/package.html
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/package.html b/s4-core/src/main/java/org/apache/s4/ft/package.html
new file mode 100644
index 0000000..37fe51d
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/package.html
@@ -0,0 +1,23 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
+<html>
+<head>
+</head>
+<body bgcolor="white">
+ <p>This package contains classes for providing some fault tolerance
+ to S4 PEs.</p>
+ <p>The current approach is based on checkpointing.</p>
+ <p>Checkpoints are taken periodically (configurable by time or
+ frequency of application events), and when restarting an S4 node,
+ saved checkpoints are automatically and lazily restored.</p>
+ <p>Lazy restoration is triggered by an application event to a PE
+ that has not yet been restored.</p>
+ <p>Checkpoints are stored in storage backends. Storage backends may
+ implement eager techniques to prefetch checkpoint data to be
+ recovered.
+ <p>
+ The application programmer must take care of marking as <b>transient</b>
+ the fields that do not have to be persisted (or cannot be persisted).
+ <p>Storage backends are pluggable and we provide some default
+ implementations in this package</p>
+</body>
+</html>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/listener/CommLayerListener.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/listener/CommLayerListener.java b/s4-core/src/main/java/org/apache/s4/listener/CommLayerListener.java
new file mode 100644
index 0000000..a67153c
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/listener/CommLayerListener.java
@@ -0,0 +1,279 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.listener;
+
+import static org.apache.s4.util.MetricsName.S4_CORE_METRICS;
+import static org.apache.s4.util.MetricsName.low_level_listener_badmsg_ct;
+import static org.apache.s4.util.MetricsName.low_level_listener_msg_drop_ct;
+import static org.apache.s4.util.MetricsName.low_level_listener_msg_in_ct;
+import static org.apache.s4.util.MetricsName.low_level_listener_qsz;
+import static org.apache.s4.util.MetricsName.s4_core_exit_ct;
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.comm.core.CommEventCallback;
+import org.apache.s4.comm.core.CommLayerState;
+import org.apache.s4.comm.core.Deserializer;
+import org.apache.s4.comm.core.ListenerProcess;
+import org.apache.s4.logger.Monitor;
+import org.apache.s4.serialize.SerializerDeserializer;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+public class CommLayerListener implements EventListener, Runnable {
+ private static Logger logger = Logger.getLogger(CommLayerListener.class);
+ private int dequeuerCount = 12;
+ private Set<EventHandler> handlers = new HashSet<EventHandler>();
+ ListenerProcess process;
+ private BlockingQueue<Object> messageQueue;
+ private int maxQueueSize = 1000;
+ private String clusterManagerAddress;
+ private String appName;
+ private Object listenerConfig;
+ private Monitor monitor;
+ private int partitionId = -1;
+ private int zkConnected = 1;
+ private SerializerDeserializer serDeser;
+
+ public void setSerDeser(SerializerDeserializer serDeser) {
+ this.serDeser = serDeser;
+ }
+
+ public void setMonitor(Monitor monitor) {
+ this.monitor = monitor;
+ monitor.setDefaultValue("tid", partitionId);
+ }
+
+ public void setMaxQueueSize(int maxQueueSize) {
+ this.maxQueueSize = maxQueueSize;
+ }
+
+ @Override
+ public int getId() {
+ return partitionId;
+ }
+
+ @Override
+ public String getAppName() {
+ return appName;
+ }
+
+ public void setAppName(String appName) {
+ this.appName = appName;
+ }
+
+ public String getClusterManagerAddress() {
+ return clusterManagerAddress;
+ }
+
+ public void setClusterManagerAddress(String clusterManagerAddress) {
+ this.clusterManagerAddress = clusterManagerAddress;
+ }
+
+ @Override
+ public void addHandler(EventHandler handler) {
+ handlers.add(handler);
+ }
+
+ @Override
+ public boolean removeHandler(EventHandler handler) {
+ return handlers.remove(handler);
+ }
+
+ public Object getListenerConfig() {
+ return this.listenerConfig;
+ }
+
+ public void init() {
+ System.err.println("appName=" + appName);
+ process = new ListenerProcess(clusterManagerAddress, appName);
+ process.setDeserializer(new PassThroughDeserializer());
+ CommEventCallback callbackHandler = new CommEventCallback() {
+ @Override
+ public void handleCallback(Map<String, Object> event) {
+ if (event != null) {
+ CommLayerState state = (CommLayerState) event.get("state");
+ if (state != null) {
+ if (state == CommLayerState.INITIALIZED) {
+ logger.info("Communication layer initialized: source:"
+ + event.get("source"));
+ } else if (state == CommLayerState.BROKEN) {
+ logger.error("Communication layer broken: source:"
+ + event.get("source"));
+ logger.error("System exiting so that process can restart.");
+ if (monitor != null) {
+ monitor.set(s4_core_exit_ct.toString(),
+ 1,
+ S4_CORE_METRICS.toString());
+ }
+ // should flush stats before exiting
+ monitor.flushStats();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ System.exit(3);
+ }
+ }
+ }
+ }
+ };
+ process.setCallbackHandler(callbackHandler);
+
+ messageQueue = new LinkedBlockingQueue<Object>(maxQueueSize);
+
+ // listenerConfig = process.acquireTaskAndCreateListener(map);
+ Thread t = new Thread(this);
+ t.setPriority(Thread.MAX_PRIORITY);
+ t.start();
+
+ if (System.getProperty("DequeuerCount") != null) {
+ dequeuerCount = Integer.parseInt(System.getProperty("DequeuerCount"));
+ }
+
+ System.out.println("dequeuer number: " + dequeuerCount);
+
+ for (int i = 0; i < dequeuerCount; i++) {
+ t = new Thread(new Dequeuer(this, i));
+ // t.setPriority(Thread.MIN_PRIORITY);
+ t.start();
+ }
+ }
+
+ // This is the actual raw listener, which simply listens for messages on the
+ // socket
+ public void run() {
+ boolean isAddMessageSucceeded = false;
+ // acquire a task to do
+ synchronized (this) {
+ Map<String, String> map = new HashMap<String, String>();
+ try {
+ map.put("ListenerId", InetAddress.getLocalHost().getHostName()
+ + "_" + System.getProperty("pid") + "_"
+ + Thread.currentThread().getId());
+ map.put("address", InetAddress.getLocalHost().getHostAddress());
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ logger.info("Waiting to acquire task");
+ listenerConfig = process.acquireTaskAndCreateListener(map);
+ logger.info("acquired task with config:" + listenerConfig);
+ Map<String, String> configMap = (Map<String, String>) listenerConfig;
+ String partition = configMap.get("partition");
+ if (partition != null) {
+ partitionId = Integer.parseInt(partition);
+ monitor.setDefaultValue("tid", partitionId);
+ logger.info("tid is set to " + partitionId);
+ }
+ this.notify();
+ }
+ while (!Thread.interrupted()) {
+ byte[] message = (byte[]) process.listen();
+
+ try {
+ isAddMessageSucceeded = messageQueue.offer(message);
+ if (monitor != null) {
+ monitor.set(low_level_listener_qsz.toString(),
+ messageQueue.size(),
+ S4_CORE_METRICS.toString());
+ if (isAddMessageSucceeded) {
+ monitor.increment(low_level_listener_msg_in_ct.toString(),
+ 1,
+ S4_CORE_METRICS.toString());
+ } else {
+ monitor.increment(low_level_listener_msg_drop_ct.toString(),
+ 1,
+ S4_CORE_METRICS.toString());
+ }
+ }
+ } catch (Exception e) {
+ Logger.getLogger("s4")
+ .error("Exception in monitor metrics on thread "
+ + Thread.currentThread().getId(),
+ e);
+ }
+ }
+ }
+
+ public Object takeMessage() throws InterruptedException {
+ return messageQueue.take();
+ }
+
+ class Dequeuer implements Runnable {
+ private int id;
+ private CommLayerListener rawListener;
+
+ public Dequeuer(CommLayerListener rawListener, int id) {
+ this.id = id;
+ this.rawListener = rawListener;
+ }
+
+ public void run() {
+ while (!Thread.interrupted()) {
+ try {
+ byte[] rawMessage = (byte[]) rawListener.takeMessage();
+ processMessage(rawMessage);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void processMessage(byte[] rawMessage) {
+ // convert the byte array into an event object
+ EventWrapper eventWrapper = null;
+ try {
+ eventWrapper = (EventWrapper) serDeser.deserialize(rawMessage);
+
+ } catch (RuntimeException rte) {
+ Logger.getLogger("s4")
+ .error("Error converting message to an event: ", rte);
+ if (monitor != null) {
+ monitor.increment(low_level_listener_badmsg_ct.toString(),
+ 1,
+ S4_CORE_METRICS.toString());
+ }
+ return;
+ }
+
+ if (eventWrapper != null) {
+ for (EventHandler handler : handlers) {
+ try {
+ handler.processEvent(eventWrapper);
+ } catch (Exception e) {
+ Logger.getLogger("s4")
+ .error("Error calling processEvent on handler", e);
+ }
+ }
+ }
+ }
+
+ }
+
+ public class PassThroughDeserializer implements Deserializer {
+ public Object deserialize(byte[] input) {
+ return input;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/listener/EventHandler.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/listener/EventHandler.java b/s4-core/src/main/java/org/apache/s4/listener/EventHandler.java
new file mode 100644
index 0000000..b6ad884
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/listener/EventHandler.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.listener;
+
+import org.apache.s4.collector.EventWrapper;
+
+public interface EventHandler {
+ void processEvent(EventWrapper eventWrapper);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/listener/EventListener.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/listener/EventListener.java b/s4-core/src/main/java/org/apache/s4/listener/EventListener.java
new file mode 100644
index 0000000..2e2cd1d
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/listener/EventListener.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.listener;
+
+public interface EventListener extends EventProducer {
+
+ int getId();
+
+ String getAppName();
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/listener/EventProducer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/listener/EventProducer.java b/s4-core/src/main/java/org/apache/s4/listener/EventProducer.java
new file mode 100644
index 0000000..2065332
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/listener/EventProducer.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.listener;
+
+public interface EventProducer {
+
+ void addHandler(EventHandler handler);
+
+ boolean removeHandler(EventHandler handler);
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/logger/Log4jMonitor.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/logger/Log4jMonitor.java b/s4-core/src/main/java/org/apache/s4/logger/Log4jMonitor.java
new file mode 100644
index 0000000..d9008fe
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/logger/Log4jMonitor.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.logger;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class Log4jMonitor extends TimerTask implements Monitor {
+ Map<String, Integer> metricMap = new ConcurrentHashMap<String, Integer>();
+ private String loggerName = "s4";
+ private int flushInterval = 600; // default is every 10 minutes
+
+ private Timer timer = new Timer();
+ private Map<String, Integer> defaultMap = new HashMap<String, Integer>();
+
+ public void setLoggerName(String loggerName) {
+ this.loggerName = loggerName;
+ }
+
+ public void setFlushInterval(int flushInterval) {
+ this.flushInterval = flushInterval;
+ }
+
+ public void init() {
+ if (flushInterval > 0) {
+ timer.scheduleAtFixedRate(this,
+ flushInterval * 1000,
+ flushInterval * 1000);
+ }
+ }
+
+ // TODO: this will be removed after changing above functions
+ public void set(String metricName, int value) {
+ metricMap.put(metricName, value);
+ }
+
+ public void flushStats() {
+ org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(loggerName);
+ for (String key : metricMap.keySet()) {
+ String message = key + " = " + metricMap.get(key);
+ logger.info(message);
+ metricMap.remove(key);
+ }
+ if (defaultMap != null) {
+ for (String key : defaultMap.keySet()) {
+ // TODO: need to be changed
+ set(key, defaultMap.get(key));
+ }
+ }
+ }
+
+ public void run() {
+ flushStats();
+ }
+
+ @Override
+ public void increment(String metricName, int increment) {
+ Integer currValue = metricMap.get(metricName);
+ if (currValue == null) {
+ currValue = 0;
+ }
+ currValue += increment;
+ metricMap.put(metricName, currValue);
+ }
+
+ @Override
+ public void setDefaultValue(String key, int val) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void increment(String metricName, int increment, String metricEventName, String... furtherDistinctions) {
+ increment(buildMetricName(metricName,
+ metricEventName,
+ furtherDistinctions),
+ increment);
+
+ }
+
+ @Override
+ public void set(String metricName, int value, String metricEventName, String... furtherDistinctions) {
+ metricMap.put(buildMetricName(metricName,
+ metricEventName,
+ furtherDistinctions),
+ value);
+ }
+
+ private String buildMetricName(String metricName, String metricEventName, String[] furtherDistinctions) {
+ StringBuffer sb = new StringBuffer(metricEventName);
+ sb.append(":");
+ sb.append(metricName);
+ if (furtherDistinctions != null) {
+ for (String furtherDistinction : furtherDistinctions) {
+ sb.append(":");
+ sb.append(furtherDistinction);
+ }
+ }
+ return sb.toString().intern();
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/logger/Monitor.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/logger/Monitor.java b/s4-core/src/main/java/org/apache/s4/logger/Monitor.java
new file mode 100644
index 0000000..1da30ad
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/logger/Monitor.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.logger;
+
+public interface Monitor {
+ public void increment(String metricName, int increment, String metricEventName, String... aggKeys);
+
+ public void increment(String metricName, int increment);
+
+ public void set(String metricName, int value, String metricEventName, String... aggKeys);
+
+ public void set(String metricName, int value);
+
+ public void flushStats();
+
+ public void setDefaultValue(String key, int val);
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/logger/TraceMessage.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/logger/TraceMessage.java b/s4-core/src/main/java/org/apache/s4/logger/TraceMessage.java
new file mode 100644
index 0000000..71ab28f
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/logger/TraceMessage.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.logger;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TraceMessage {
+ private long traceId;
+ private Map<String, String> propertyMap = new HashMap<String, String>();
+
+ public void setTraceId(long traceId) {
+ this.traceId = traceId;
+ }
+
+ public long getTraceId() {
+ return this.traceId;
+ }
+
+ public void setProperty(String name, String value) {
+ this.propertyMap.put(name, value);
+ }
+
+ public String toString() {
+ return this.traceId + "; " + this.propertyMap.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/message/PrototypeRequest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/message/PrototypeRequest.java b/s4-core/src/main/java/org/apache/s4/message/PrototypeRequest.java
new file mode 100644
index 0000000..20744e7
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/message/PrototypeRequest.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.message;
+
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.dispatcher.partitioner.Hasher;
+import org.apache.s4.processor.PrototypeWrapper;
+import org.apache.s4.util.MethodInvoker;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * A request for a value from the prototype of PEs.
+ */
+public class PrototypeRequest extends Request {
+
+ private final List<String> query;
+
+ public PrototypeRequest(List<String> query, RInfo info) {
+ this.query = query;
+ this.rinfo = info;
+ }
+
+ public PrototypeRequest(List<String> query) {
+ this.query = query;
+ this.rinfo = null;
+ }
+
+ public PrototypeRequest() {
+ this.query = Collections.<String> emptyList();
+ this.rinfo = null;
+ }
+
+ public String toString() {
+ return "PROTOTYPE: query=[" + query + "] info=[" + rinfo + "]";
+ }
+
+ /**
+ * List of queries to execute.
+ *
+ * @return list of queries
+ */
+ public List<String> getQuery() {
+ return query;
+ }
+
+ /**
+ * Evaluate Request on a particular PE Prototype.
+ *
+ * @param pw
+ * prototype
+ * @return Response object.
+ */
+ public Response evaluate(PrototypeWrapper pw) {
+
+ HashMap<String, Object> results = new HashMap<String, Object>();
+ HashMap<String, String> exceptions = new HashMap<String, String>();
+
+ for (String q : query) {
+ if (q.startsWith("$")) {
+ // requests for getters should be of the form $fieldA.
+ // Responds with pe.getFieldA()
+ try {
+ Object res = MethodInvoker.invokeGetter(pw, q.substring(1));
+ results.put(q, res);
+
+ } catch (Exception e) {
+ exceptions.put(q, e.toString());
+ }
+
+ } else if (q.equalsIgnoreCase("count")) {
+ // Some aggregate operators
+ results.put(q, pw.getPECount());
+
+ } else {
+ exceptions.put(q, "Query Parse Error");
+ }
+ }
+
+ return new Response(results, exceptions, this);
+ }
+
+ public List<CompoundKeyInfo> partition(Hasher h, String delim, int partCount) {
+ // send to all partitions
+ List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+
+ for (int i = 0; i < partCount; ++i) {
+ CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+ partitionInfo.setPartitionId(i);
+ partitionInfoList.add(partitionInfo);
+ }
+
+ return partitionInfoList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/message/Request.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/message/Request.java b/s4-core/src/main/java/org/apache/s4/message/Request.java
new file mode 100644
index 0000000..c6cae68
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/message/Request.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.message;
+
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.dispatcher.partitioner.Hasher;
+import org.apache.s4.util.GsonUtil;
+
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.gson.InstanceCreator;
+
+abstract public class Request {
+
+ protected RInfo rinfo = nullRInfo;
+
+ public final static RInfo nullRInfo = new NullRInfo();
+
+ /**
+ * Requester/Return information
+ */
+ abstract public static class RInfo {
+
+ private long id = 0;
+
+ /**
+ * Identity of request. This is typically specified by the requester.
+ */
+ public long getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ private String stream;
+
+ /**
+ * Stream name on which response should be sent.
+ *
+ * @return stream name.
+ */
+ public String getStream() {
+ return stream;
+ }
+
+ public void setStream(String stream) {
+ this.stream = stream;
+ }
+
+ private int partition;
+
+ /**
+ * Partition Id from which this request originated. This may be used to
+ * return a response to the same partition.
+ *
+ * @return partition id
+ */
+ public int getPartition() {
+ return partition;
+ }
+
+ public void setPartition(int partition) {
+ this.partition = partition;
+ }
+
+ // Tell Gson how to instantiate one of these: create a ClientRInfo
+ static {
+ InstanceCreator<RInfo> creator = new InstanceCreator<RInfo>() {
+ public org.apache.s4.message.Request.RInfo createInstance(Type type) {
+ return new org.apache.s4.message.Request.ClientRInfo();
+ }
+ };
+
+ GsonUtil.registerTypeAdapter(RInfo.class, creator);
+ }
+
+ }
+
+ public static class ClientRInfo extends RInfo {
+ private UUID requesterUUID = null;
+
+ /**
+ * Identity of requesting client. This is used to send the response back
+ * to the client.
+ *
+ * @return UUID of the client from which the request originated.
+ */
+ public UUID getRequesterUUID() {
+ return requesterUUID;
+ }
+
+ public void setRequesterUUID(UUID requesterUUID) {
+ this.requesterUUID = requesterUUID;
+ }
+
+ public String toString() {
+ return "(id:" + getId() + " requester:" + getRequesterUUID()
+ + " partition:" + getPartition() + " stream:" + getStream()
+ + ")";
+ }
+ }
+
+ public static class PERInfo extends RInfo {
+ private String requesterKey = null;
+
+ /**
+ * Identity of requesting PE. This is used to route the response back to
+ * the originating PE.
+ *
+ * @return key value of the PE from which the request originated.
+ */
+ public String getRequesterKey() {
+ return requesterKey;
+ }
+
+ public void setRequesterKey(String requesterKey) {
+ this.requesterKey = requesterKey;
+ }
+
+ public String toString() {
+ return "(id:" + getId() + " requester:" + getRequesterKey()
+ + " partition:" + getPartition() + " stream:" + getStream()
+ + ")";
+ }
+ }
+
+ public static class NullRInfo extends RInfo {
+ public NullRInfo() {
+ super.stream = "@null";
+ super.partition = -1;
+ }
+ }
+
+ /**
+ * Query metainformation.
+ *
+ * @return Info representing origin of request.
+ */
+ public RInfo getRInfo() {
+ return rinfo;
+ }
+
+ /**
+ * Query metainformation.
+ */
+ public void setRInfo(RInfo rinfo) {
+ this.rinfo = rinfo;
+ }
+
+ /**
+ * Partition itself. This is used by the default partitioner.
+ *
+ * @param h
+ * hasher
+ * @param delim
+ * delimiter used to concatenate compound key values
+ * @param partCount
+ * number of partitions
+ * @return list of compound keys: one event may have to be sent to multiple
+ * nodes.
+ */
+ abstract public List<CompoundKeyInfo> partition(Hasher h, String delim,
+ int partCount);
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/message/Response.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/message/Response.java b/s4-core/src/main/java/org/apache/s4/message/Response.java
new file mode 100644
index 0000000..cf01d95
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/message/Response.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.message;
+
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class Response {
+
+ private Map<String, Object> result;
+
+ private Map<String, String> exception;
+
+ private Request request;
+
+ public Response(Map<String, Object> result, Request request) {
+ this.result = result;
+ this.request = request;
+ }
+
+ public Response(Map<String, Object> result, Map<String, String> exception,
+ Request request) {
+ this.result = result;
+ this.exception = exception;
+ this.request = request;
+ }
+
+ public Response() {
+ result = null;
+ exception = null;
+ request = null;
+ }
+
+ /**
+ * Result of a request.
+ *
+ * @return map from query strings o corresponding values.
+ */
+ public Map<String, Object> getResult() {
+ return result;
+ }
+
+ public Map<String, String> getException() {
+ return exception;
+ }
+
+ public Request getRequest() {
+ return request;
+ }
+
+ public Request.RInfo getRInfo() {
+ return (request != null ? request.getRInfo() : null);
+ }
+
+ public String toString() {
+ return "[" + result + "] (" + request + ")";
+ }
+
+ public List<CompoundKeyInfo> partition(int partCount) {
+ // partition id is available from the request info object
+
+ int p = this.getRInfo().getPartition();
+ List<CompoundKeyInfo> partitionInfoList = null;
+
+ if (p >= 0 && p < partCount) {
+ CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+ partitionInfo.setPartitionId(p);
+
+ partitionInfoList = new ArrayList<CompoundKeyInfo>();
+ partitionInfoList.add(partitionInfo);
+ }
+
+ return partitionInfoList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/message/SinglePERequest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/message/SinglePERequest.java b/s4-core/src/main/java/org/apache/s4/message/SinglePERequest.java
new file mode 100644
index 0000000..b2c1551
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/message/SinglePERequest.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.message;
+
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.dispatcher.partitioner.Hasher;
+import org.apache.s4.dispatcher.partitioner.KeyInfo;
+import org.apache.s4.processor.AbstractPE;
+import org.apache.s4.util.MethodInvoker;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+import org.springframework.util.StringUtils;
+
+/**
+ * A request for a value from a particular PE.
+ */
+public class SinglePERequest extends Request {
+
+ private final List<String> target;
+
+ private final List<String> query;
+
+ public SinglePERequest(List<String> target, List<String> query, RInfo info) {
+ this.target = target;
+ this.query = query;
+ this.rinfo = info;
+ }
+
+ public SinglePERequest(List<String> target, List<String> query) {
+ this.target = target;
+ this.query = query;
+ this.rinfo = null;
+ }
+
+ public SinglePERequest() {
+ this.target = Collections.<String> emptyList();
+ this.query = Collections.<String> emptyList();
+ this.rinfo = null;
+ }
+
+ public String toString() {
+ return "target:" + target + " query:" + query + " info:" + rinfo;
+ }
+
+ /**
+ * Fields used to target a particular PE.
+ *
+ * @return list of targeting values. Order matters.
+ */
+ public List<String> getTarget() {
+ return target;
+ }
+
+ /**
+ * List of field names that have to be read form target PE.
+ *
+ * @return list of field name strings.
+ */
+ public List<String> getQuery() {
+ return query;
+ }
+
+ /**
+ * Evaluate Request on a particular PE.
+ *
+ * @param pe
+ * @return Response object.
+ */
+ public Response evaluate(AbstractPE pe) {
+
+ HashMap<String, Object> results = new HashMap<String, Object>();
+ HashMap<String, String> exceptions = new HashMap<String, String>();
+
+ for (String q : query) {
+ // requests for getters should be of the form $field. Responds with
+ // pe.getField()
+ if (q.startsWith("$")) {
+ try {
+ Object res = MethodInvoker.invokeGetter(pe, q.substring(1));
+ results.put(q, res);
+
+ } catch (Exception e) {
+ exceptions.put(q, e.toString());
+ }
+ }
+ }
+
+ return new Response(results, exceptions, this);
+ }
+
+ public List<CompoundKeyInfo> partition(Hasher h, String delim, int partCount) {
+ List<String> valueList = this.getTarget();
+ if (valueList == null)
+ return null;
+
+ // First, build the key
+ KeyInfo keyInfo = new KeyInfo();
+ // special kay name to denote request
+ keyInfo.addElementToPath("#req");
+
+ // for value, concatenate list of values from Request's target field.
+ String stringValue = StringUtils.collectionToDelimitedString(valueList,
+ delim);
+ keyInfo.setValue(stringValue);
+
+ // partition id is derived form string value, as usual
+ int partitionId = (int) (h.hash(stringValue) % partCount);
+
+ CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+ partitionInfo.addKeyInfo(keyInfo);
+ partitionInfo.setCompoundValue(stringValue);
+ partitionInfo.setPartitionId(partitionId);
+
+ List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+ partitionInfoList.add(partitionInfo);
+
+ return partitionInfoList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java b/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java
new file mode 100644
index 0000000..46dd19b
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.persist;
+
+import org.apache.s4.util.clock.Clock;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+
+public class ConMapPersister implements Persister {
+ private AtomicInteger persistCount = new AtomicInteger(0);
+ private boolean selfClean = false;
+ private int cleanWaitTime = 40; // 20 seconds by default
+ private String loggerName = "s4";
+ ConcurrentHashMap<String, CacheEntry> cache;
+ Clock s4Clock;
+
+ private int startCapacity = 5000;
+
+ public void setStartCapacity(int startCapacity) {
+ this.startCapacity = startCapacity;
+ }
+
+ public int getStartCapacity() {
+ return startCapacity;
+ }
+
+ public void setSelfClean(boolean selfClean) {
+ this.selfClean = selfClean;
+ }
+
+ public void setCleanWaitTime(int cleanWaitTime) {
+ this.cleanWaitTime = cleanWaitTime;
+ }
+
+ public void setLoggerName(String loggerName) {
+ this.loggerName = loggerName;
+ }
+
+ public ConMapPersister(Clock s4Clock) {
+ this.s4Clock = s4Clock;
+ }
+
+ public void setS4Clock(Clock s4Clock) {
+ this.s4Clock = s4Clock;
+ }
+
+ public ConMapPersister() {
+ }
+
+ public void init() {
+ cache = new ConcurrentHashMap<String, CacheEntry>(this.getStartCapacity());
+
+ if (selfClean) {
+ Runnable r = new Runnable() {
+ public void run() {
+ while (!Thread.interrupted()) {
+ int cleanCount = ConMapPersister.this.cleanOutGarbage();
+ Logger.getLogger(loggerName).info("Cleaned out "
+ + cleanCount + " entries; Persister has "
+ + cache.size() + " entries");
+ try {
+ Thread.sleep(cleanWaitTime * 1000);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ };
+ Thread t = new Thread(r);
+ t.start();
+ t.setPriority(Thread.MIN_PRIORITY);
+ }
+ }
+
+ public int getQueueSize() {
+ return 0;
+ }
+
+ public int getPersistCount() {
+ return persistCount.get();
+ }
+
+ public int getCacheEntryCount() {
+ return cache.size();
+ }
+
+ public void setAsynch(String key, Object value, int period) {
+ // there really is no asynch for the local cache
+ set(key, value, period);
+ }
+
+ public void set(String key, Object value, int period) {
+ if (value == null) {
+ cache.remove(key);
+ return;
+ }
+ persistCount.getAndIncrement();
+ CacheEntry ce = new CacheEntry();
+ ce.value = value;
+ ce.period = period;
+ ce.addTime = s4Clock.getCurrentTime();
+ cache.put(key, ce);
+ }
+
+ public Object get(String key) {
+ CacheEntry ce = cache.get(key);
+ if (ce == null) {
+ return null;
+ }
+
+ if (ce.isExpired()) {
+ return null;
+ }
+
+ return ce.value;
+ }
+
+ public Map<String, Object> getBulk(String[] keys) {
+ HashMap map = new HashMap<String, Object>();
+ for (String key : keys) {
+ Object value = get(key);
+ if (value != null) {
+ map.put(key, value);
+ }
+ }
+ return map;
+ }
+
+ public Object getObject(String key) {
+ return get(key);
+ }
+
+ public Map<String, Object> getBulkObjects(String[] keys) {
+ return getBulk(keys);
+ }
+
+ public void remove(String key) {
+ cache.remove(key);
+ }
+
+ public int cleanOutGarbage() {
+ int count = 0;
+ for (Enumeration en = cache.keys(); en.hasMoreElements();) {
+ String key = (String) en.nextElement();
+ CacheEntry ce = cache.get(key);
+ if (ce != null && ce.isExpired()) {
+ count++;
+ cache.remove(key);
+ }
+ }
+ return count;
+ }
+
+ public Set<String> keySet() {
+ return cache.keySet();
+ }
+
+ public class CacheEntry {
+ Object value;
+ long addTime;
+ int period;
+
+ public boolean isExpired() {
+ if (period > 0) {
+ if ((addTime + (1000 * (long) period)) <= s4Clock.getCurrentTime()) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/persist/DumpingPersister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/persist/DumpingPersister.java b/s4-core/src/main/java/org/apache/s4/persist/DumpingPersister.java
new file mode 100644
index 0000000..fca9d60
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/persist/DumpingPersister.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.persist;
+
+import org.apache.s4.processor.OutputFormatter;
+import org.apache.s4.util.clock.Clock;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.log4j.Logger;
+
+public class DumpingPersister extends ConMapPersister implements Runnable {
+
+ public DumpingPersister() {
+ }
+
+ public DumpingPersister(Clock s4Clock) {
+ super(s4Clock);
+ // TODO Auto-generated constructor stub
+ }
+
+ private String dumpFilePrefix;
+ private Map<String, OutputFormatter> regexFormatter;
+ private Pattern[] patterns;
+ private OutputFormatter[] formatters;
+ private long outputTimeBoundary;
+
+ public void setDumpFilePrefix(String dumpFilePrefix) {
+ this.dumpFilePrefix = dumpFilePrefix;
+ }
+
+ public void setRegexFormatter(Map<String, OutputFormatter> regexFormatter) {
+ this.regexFormatter = regexFormatter;
+ }
+
+ public void setOutputTimeBoundary(long outputTimeBoundary) {
+ this.outputTimeBoundary = outputTimeBoundary;
+ }
+
+ public void init() {
+ super.init();
+
+ Set<String> regexes = regexFormatter.keySet();
+ patterns = new Pattern[regexes.size()];
+ formatters = new OutputFormatter[regexes.size()];
+
+ int i = 0;
+ for (String regex : regexes) {
+ patterns[i] = Pattern.compile(regex);
+ formatters[i] = regexFormatter.get(regex);
+ i++;
+ }
+
+ Thread t = new Thread(this);
+ t.start();
+ }
+
+ public void run() {
+ long boundaryInMillis = outputTimeBoundary * 1000;
+ long currentTime = System.currentTimeMillis();
+ while (!Thread.interrupted()) {
+ long currentBoundary = (currentTime / boundaryInMillis)
+ * boundaryInMillis;
+ long interval = ((currentBoundary + boundaryInMillis) - System.currentTimeMillis());
+ if (interval > 0) {
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ currentTime = System.currentTimeMillis();
+
+ try {
+ output();
+ } catch (Exception e) {
+ Logger.getLogger("s4").error("Exception dumping persister", e);
+ }
+ }
+ }
+
+ public void output() {
+ File file = new File(dumpFilePrefix + UUID.randomUUID().toString());
+ Logger.getLogger("s4").info("Dumping to " + file);
+ FileOutputStream fos = null;
+ OutputStreamWriter osw = null;
+ BufferedWriter dumpWriter = null;
+
+ try {
+ fos = new FileOutputStream(file);
+ osw = new OutputStreamWriter(fos);
+ dumpWriter = new BufferedWriter(osw);
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe);
+ }
+
+ int savedPriority = Thread.currentThread().getPriority();
+ try {
+
+ Thread.currentThread().setPriority(Thread.MIN_PRIORITY);
+ Set<String> keys = new HashSet<String>();
+ for (String key : this.keySet()) {
+ keys.add(key);
+ }
+
+ for (String key : keys) {
+ Object value = this.get(key);
+ if (value == null) {
+ continue;
+ }
+ for (int patternId = 0; patternId < patterns.length; patternId++) {
+ Matcher m = patterns[patternId].matcher(key);
+ if (m.matches()) {
+ Object formattedValue = formatters[patternId].format(value);
+ dumpWriter.write(key + " = " + formattedValue + "\n");
+ }
+ }
+
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ Thread.currentThread().setPriority(savedPriority);
+ try {
+ dumpWriter.close();
+ } catch (Exception e) {
+ }
+ try {
+ osw.close();
+ } catch (Exception e) {
+ }
+ try {
+ fos.close();
+ } catch (Exception e) {
+ }
+ }
+ }
+}