You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/18 16:06:03 UTC
[1/2] git commit: Added checkpointing framework + tests - also
simplified some other tests - added optional checkpointing in twitter example
Updated Branches:
refs/heads/S4-11 [created] ab1fca5ec
Added checkpointing framework + tests
- also simplified some other tests
- added optional checkpointing in twitter example
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/ab1fca5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/ab1fca5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/ab1fca5e
Branch: refs/heads/S4-11
Commit: ab1fca5ec9bc8c60bde4e31afdc595b857dadf7f
Parents: ecbfd42
Author: Matthieu Morel <mm...@apache.org>
Authored: Wed Jul 18 18:02:55 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Wed Jul 18 18:02:55 2012 +0200
----------------------------------------------------------------------
.../src/main/java/org/apache/s4/base/Event.java | 12 +
.../java/org/apache/s4/base/util/S4RLoader.java | 7 +-
.../java/org/apache/s4/comm/QueueingListener.java | 1 +
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 3 +-
.../java/org/apache/s4/comm/tcp/TCPListener.java | 3 +-
.../java/org/apache/s4/comm/udp/UDPListener.java | 1 +
.../java/org/apache/s4/fixtures/CommTestUtils.java | 29 +--
subprojects/s4-core/s4-core.gradle | 1 +
.../src/main/java/org/apache/s4/core/App.java | 18 +-
.../java/org/apache/s4/core/DefaultCoreModule.java | 6 +
.../src/main/java/org/apache/s4/core/Main.java | 2 +-
.../java/org/apache/s4/core/ProcessingElement.java | 223 +++++++++---
.../src/main/java/org/apache/s4/core/Stream.java | 7 +-
.../java/org/apache/s4/core/ft/CheckpointId.java | 124 +++++++
.../org/apache/s4/core/ft/CheckpointingConfig.java | 55 +++
.../apache/s4/core/ft/CheckpointingFramework.java | 52 +++
.../org/apache/s4/core/ft/CheckpointingTask.java | 33 ++
.../s4/core/ft/DefaultFileSystemStateStorage.java | 179 +++++++++
.../main/java/org/apache/s4/core/ft/FetchTask.java | 36 ++
.../ft/FileSystemBackendCheckpointingModule.java | 15 +
.../s4/core/ft/LoggingStorageCallbackFactory.java | 54 +++
.../s4/core/ft/NoOpCheckpointingFramework.java | 26 ++
.../java/org/apache/s4/core/ft/SafeKeeper.java | 282 +++++++++++++++
.../java/org/apache/s4/core/ft/SaveStateTask.java | 75 ++++
.../java/org/apache/s4/core/ft/SerializeTask.java | 46 +++
.../java/org/apache/s4/core/ft/StateStorage.java | 72 ++++
.../org/apache/s4/core/ft/StorageCallback.java | 37 ++
.../apache/s4/core/ft/StorageCallbackFactory.java | 35 ++
.../test/java/org/apache/s4/core/TriggerTest.java | 12 +-
...duleWithUnrespondingFetchingStorageBackend.java | 20 +
.../org/apache/s4/core/ft/CheckpointingTest.java | 153 ++++++++
.../java/org/apache/s4/core/ft/FTWordCountApp.java | 18 +
.../org/apache/s4/core/ft/FTWordCountTest.java | 131 +++++++
...ndWithZKStorageCallbackCheckpointingModule.java | 50 +++
.../java/org/apache/s4/core/ft/RecoveryTest.java | 158 ++++++++
.../core/ft/S4AppWithCountBasedCheckpointing.java | 24 ++
.../s4/core/ft/S4AppWithManualCheckpointing.java | 29 ++
.../core/ft/S4AppWithTimeBasedCheckpointing.java | 28 ++
.../java/org/apache/s4/core/ft/StatefulTestPE.java | 97 +++++
.../core/ft/StorageWithUnrespondingFetching.java | 45 +++
.../org/apache/s4/core/triggers/TriggerablePE.java | 6 +-
.../org/apache/s4/core/triggers/TriggeredApp.java | 12 +-
.../apache/s4/deploy/TestAutomaticDeployment.java | 62 ++--
.../java/org/apache/s4/fixtures/CoreTestUtils.java | 5 -
.../java/org/apache/s4/fixtures/SocketAdapter.java | 101 -----
.../org/apache/s4/wordcount/SentenceKeyFinder.java | 12 +-
.../java/org/apache/s4/wordcount/WordCountApp.java | 32 +--
.../org/apache/s4/wordcount/WordCountTest.java | 62 ++--
.../org/apache/s4/wordcount/WordSplitterPE.java | 15 +-
.../java/org/apache/s4/deploy/SocketAdapter.java | 80 ----
.../main/java/org/apache/s4/deploy/TestApp.java | 27 +-
.../org/apache/s4/example/twitter/TopNTopicPE.java | 4 +
.../s4/example/twitter/TopicCountAndReportPE.java | 8 +-
.../s4/example/twitter/TwitterCounterApp.java | 30 +-
54 files changed, 2235 insertions(+), 420 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
index 7efadcc..1f5d7e4 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
@@ -188,4 +188,16 @@ public class Event {
}
}
+
+ @Override
+ public String toString() {
+ Map<String, String> attributesAsMap = getAttributesAsMap();
+ StringBuilder sb = new StringBuilder();
+ sb.append("[");
+ for (Map.Entry<String, String> entry : attributesAsMap.entrySet()) {
+ sb.append("{" + entry.getKey() + ";" + entry.getValue() + "},");
+ }
+ sb.append("]");
+ return sb.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java
index f75a867..8d3e062 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java
@@ -18,7 +18,12 @@ public class S4RLoader extends URLClassLoader {
}
public Class<?> loadGeneratedClass(String name, byte[] bytes) {
- return defineClass(name, bytes, 0, bytes.length);
+ Class<?> clazz = findLoadedClass(name);
+ if (clazz == null) {
+ return defineClass(name, bytes, 0, bytes.length);
+ } else {
+ return clazz;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
index 46f0a8e..2a9dc5c 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
@@ -46,6 +46,7 @@ public class QueueingListener implements Listener, Runnable {
// System.out.println("QueueingListener: About to take message from queue");
return queue.take();
} catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 4472910..d2aa64f 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -132,6 +132,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
} catch (InterruptedException ie) {
logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
clusterNode.getPort()));
+ Thread.currentThread().interrupt();
}
return false;
}
@@ -182,7 +183,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
bootstrap.releaseExternalResources();
} catch (InterruptedException ie) {
logger.error("Interrupted while closing");
- ie.printStackTrace();
+ Thread.currentThread().interrupt();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index 3e905e6..d7d49bd 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -80,6 +80,7 @@ public class TCPListener implements Listener {
byte[] msg = handoffQueue.take();
return msg;
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
return null;
}
}
@@ -93,7 +94,7 @@ public class TCPListener implements Listener {
try {
channels.close().await();
} catch (InterruptedException e) {
- e.printStackTrace();
+ Thread.currentThread().interrupt();
}
bootstrap.releaseExternalResources();
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
index 316999c..43bb470 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
@@ -74,6 +74,7 @@ public class UDPListener implements Listener, Runnable {
try {
return handoffQueue.take();
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
index 2791da6..63e32b5 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
@@ -5,9 +5,7 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
-import java.io.PrintWriter;
import java.net.InetSocketAddress;
-import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;
import java.util.ArrayList;
@@ -41,11 +39,9 @@ public class CommTestUtils {
public static final int ZK_PORT = 2181;
public static final String ZK_STRING = "localhost:" + ZK_PORT;
- public static final int INITIAL_BOOKIE_PORT = 5000;
public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
+ "storage");
- public static ServerSocket serverSocket;
static {
logger.info("Storage dir: " + DEFAULT_STORAGE_DIR);
}
@@ -233,6 +229,7 @@ public class CommTestUtils {
try {
Thread.sleep(250);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
// ignore
}
}
@@ -281,6 +278,7 @@ public class CommTestUtils {
try {
Thread.sleep(250);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
// ignore
}
}
@@ -295,12 +293,6 @@ public class CommTestUtils {
}
- public static void stopSocketAdapter() throws IOException {
- if (serverSocket != null) {
- serverSocket.close();
- }
- }
-
/**
* gradle and eclipse have different directories for output files This is justified here
* http://gradle.1045684.n5.nabble.com/Changing-default-IDE-output-directories-td3335478.html#a3337433
@@ -330,21 +322,4 @@ public class CommTestUtils {
}
- public static void injectIntoStringSocketAdapter(String string) throws IOException {
- Socket socket = null;
- PrintWriter writer = null;
- try {
- socket = new Socket("localhost", 12000);
- writer = new PrintWriter(socket.getOutputStream(), true);
- writer.println(string);
- } catch (IOException e) {
- e.printStackTrace();
- System.exit(-1);
- } finally {
- if (socket != null) {
- socket.close();
- }
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index ad3bbbe..c3a3f52 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -20,6 +20,7 @@ dependencies {
compile project(":s4-base")
compile project(":s4-comm")
compile project(path: ':s4-comm', configuration: 'tests')
+ compile libraries.commons_codec
compile libraries.jcommander
testCompile project(path: ':s4-comm', configuration: 'tests')
testCompile libraries.gradle_base_services
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 0beec16..bc20e98 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -26,7 +26,7 @@ import org.apache.s4.base.KeyFinder;
import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.comm.serialize.KryoSerDeser;
import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.core.App.ClockType;
+import org.apache.s4.core.ft.CheckpointingFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,8 +34,10 @@ import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.google.inject.name.Named;
-/*
- * Container base class to hold all processing elements. We will implement administrative methods here.
+/**
+ * Container base class to hold all processing elements.
+ *
+ * It is also where one defines the application graph: PE prototypes, internal streams, input and output streams.
*/
public abstract class App {
@@ -73,6 +75,10 @@ public abstract class App {
@Named("cluster.name")
String clusterName;
+ // default is NoOpCheckpointingFramework
+ @Inject
+ CheckpointingFramework checkpointingFramework;
+
// serialization uses the application class loader
private SerializerDeserializer serDeser = new KryoSerDeser(getClass().getClassLoader());
@@ -111,7 +117,7 @@ public abstract class App {
}
/* Should only be used within the core package. */
- public void addStream(Streamable stream) {
+ public void addStream(Streamable<Event> stream) {
streams.add(stream);
}
@@ -262,6 +268,10 @@ public abstract class App {
return serDeser;
}
+ public CheckpointingFramework getCheckpointingFramework() {
+ return checkpointingFramework;
+ }
+
/**
* Creates a stream with a specific key finder. The event is delivered to the PE instances in the target PE
* prototypes by key.
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index b740a21..d441d59 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -11,6 +11,8 @@ import org.apache.s4.base.SerializerDeserializer;
import org.apache.s4.base.util.S4RLoaderFactory;
import org.apache.s4.comm.DefaultHasher;
import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.core.ft.CheckpointingFramework;
+import org.apache.s4.core.ft.NoOpCheckpointingFramework;
import org.apache.s4.deploy.DeploymentManager;
import org.apache.s4.deploy.DistributedDeploymentManager;
import org.slf4j.Logger;
@@ -59,6 +61,10 @@ public class DefaultCoreModule extends AbstractModule {
bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
bind(S4RLoaderFactory.class);
+
+ // For enabling checkpointing, one needs to use a custom module, such as
+ // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
+ bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
}
private void loadProperties(Binder binder) {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index 5f1375b..e01fd33 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -160,7 +160,7 @@ public class Main {
@Parameter(names = "-appClass", description = "App class to load. This will disable dynamic downloading but allows to start apps directly. These app classes must have been loaded first, usually through a custom module.", required = false, hidden = true)
String appClass = null;
- @Parameter(names = "-extraModulesClasses", description = "additional configuration modules (they will be instantiated through their constructor without arguments).", variableArity = true, required = false, hidden = true)
+ @Parameter(names = { "-extraModulesClasses", "-emc" }, description = "Additional configuration modules (they will be instantiated through their constructor without arguments).", variableArity = true, required = false, hidden = true)
List<String> extraModulesClasses = new ArrayList<String>();
@Parameter(names = { "-namedStringParameters", "-p" }, description = "Inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-namedStringParameters={name1=value1},{name2=value2} '", hidden = false, converter = InlineConfigParameterConverter.class)
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 4dced75..10734ca 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -1,20 +1,7 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
package org.apache.s4.core;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
import java.util.Collection;
import java.util.Map;
import java.util.Timer;
@@ -25,6 +12,10 @@ import java.util.concurrent.TimeUnit;
import net.jcip.annotations.ThreadSafe;
import org.apache.s4.base.Event;
+import org.apache.s4.core.ft.CheckpointId;
+import org.apache.s4.core.ft.CheckpointingConfig;
+import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
+import org.apache.s4.core.ft.CheckpointingTask;
import org.apache.s4.core.gen.OverloadDispatcher;
import org.apache.s4.core.gen.OverloadDispatcherGenerator;
import org.slf4j.Logger;
@@ -34,6 +25,7 @@ import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Maps;
@@ -94,34 +86,41 @@ import com.google.common.collect.Maps;
*/
public abstract class ProcessingElement implements Cloneable {
- private static final Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
- private static final String SINGLETON = "singleton";
+ transient private static final Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
+ transient private static final String SINGLETON = "singleton";
- protected App app;
+ transient protected App app;
/*
* This maps holds all the instances. We make it package private to prevent concrete classes from updating the
* collection.
*/
- Cache<String, ProcessingElement> peInstances;
+ transient Cache<String, ProcessingElement> peInstances;
/* This map is initialized in the prototype and cloned to instances. */
- Map<Class<? extends Event>, Trigger> triggers;
+ transient Map<Class<? extends Event>, Trigger> triggers;
/* PE instance id. */
String id = "";
/* Private fields. */
- private ProcessingElement pePrototype;
- private boolean haveTriggers = false;
- private long timerIntervalInMilliseconds = 0;
- private Timer timer;
- private boolean isPrototype = true;
- private boolean isThreadSafe = false;
- private String name = null;
- private boolean isSingleton = false;
-
- private transient OverloadDispatcher overloadDispatcher;
+ transient private ProcessingElement pePrototype;
+ transient private boolean haveTriggers = false;
+ transient private long timerIntervalInMilliseconds = 0;
+ transient private Timer triggerTimer;
+ transient private Timer checkpointingTimer;
+ transient private boolean isPrototype = true;
+ transient private boolean isThreadSafe = false;
+ transient private String name = null;
+ transient private boolean isSingleton = false;
+ transient long eventCount = 0;
+
+ transient private OverloadDispatcher overloadDispatcher;
+ transient private boolean recoveryAttempted = false;
+ transient private boolean dirty = false;
+
+ transient private CheckpointingConfig checkpointingConfig = new CheckpointingConfig.Builder(CheckpointingMode.NONE)
+ .build();
protected ProcessingElement() {
OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(this.getClass());
@@ -211,7 +210,7 @@ public abstract class ProcessingElement implements Cloneable {
return peInstances.size();
}
- Map<String, ProcessingElement> getPEInstances() {
+ public Map<String, ProcessingElement> getPEInstances() {
return peInstances.asMap();
}
@@ -314,8 +313,9 @@ public abstract class ProcessingElement implements Cloneable {
/* Skip trigger checking overhead if there are no triggers. */
haveTriggers = true;
- if (timeUnit != null && timeUnit != TimeUnit.MILLISECONDS)
+ if (timeUnit != null && timeUnit != TimeUnit.MILLISECONDS) {
interval = timeUnit.convert(interval, TimeUnit.MILLISECONDS);
+ }
Trigger config = new Trigger(numEvents, interval);
@@ -374,14 +374,14 @@ public abstract class ProcessingElement implements Cloneable {
Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");
- if (timer != null) {
- timer.cancel();
+ if (triggerTimer != null) {
+ triggerTimer.cancel();
}
if (interval == 0)
return this;
- timer = new Timer();
+ triggerTimer = new Timer();
return this;
}
@@ -406,8 +406,11 @@ public abstract class ProcessingElement implements Cloneable {
} else {
object = this;
}
-
synchronized (object) {
+ if (!recoveryAttempted) {
+ recover();
+ recoveryAttempted = true;
+ }
/* Dispatch onEvent() method. */
overloadDispatcher.dispatchEvent(this, event);
@@ -416,9 +419,28 @@ public abstract class ProcessingElement implements Cloneable {
if (haveTriggers && isTrigger(event)) {
overloadDispatcher.dispatchTrigger(this, event);
}
+
+ eventCount++;
+
+ setDirty(true);
+
+ if (isCheckpointable()) {
+ checkpoint();
+ }
}
}
+ protected boolean isCheckpointable() {
+ return getApp().checkpointingFramework.isCheckpointable(this);
+ }
+
+ public void checkpoint() {
+
+ getApp().getCheckpointingFramework().saveState(this);
+ // remove dirty flag
+ dirty = false;
+ }
+
private boolean isTrigger(Event event) {
return isTrigger(event, event.getClass());
}
@@ -466,8 +488,8 @@ public abstract class ProcessingElement implements Cloneable {
protected void removeAll() {
/* Close resources in prototype. */
- if (timer != null) {
- timer.cancel();
+ if (triggerTimer != null) {
+ triggerTimer.cancel();
logger.info("Timer stopped.");
}
@@ -503,12 +525,22 @@ public abstract class ProcessingElement implements Cloneable {
}
/* Start timer. */
- if (timer != null) {
- timer.schedule(new OnTimeTask(), 0, timerIntervalInMilliseconds);
- logger.debug("Started timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
+ if (triggerTimer != null) {
+ triggerTimer
+ .scheduleAtFixedRate(new OnTimeTask(), timerIntervalInMilliseconds, timerIntervalInMilliseconds);
+ logger.debug("Started trigger timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
this.getClass().getName(), id, String.valueOf(timerIntervalInMilliseconds) });
}
+ if (checkpointingConfig.mode == CheckpointingMode.TIME) {
+ checkpointingTimer = new Timer();
+ checkpointingTimer.scheduleAtFixedRate(new CheckpointingTask(this),
+ checkpointingConfig.timeUnit.toMillis(checkpointingConfig.frequency),
+ checkpointingConfig.timeUnit.toMillis(checkpointingConfig.frequency));
+ logger.debug("Started checkpointing timer for PE prototype [{}], ID [{}] with interval [{}].",
+ new String[] { this.getClass().getName(), id, String.valueOf(checkpointingConfig.frequency) });
+ }
+
/* Check if this PE is annotated as thread safe. */
if (getClass().isAnnotationPresent(ThreadSafe.class) == true) {
@@ -535,7 +567,7 @@ public abstract class ProcessingElement implements Cloneable {
}
return peInstances.get(id);
} catch (ExecutionException e) {
- logger.error("Problem when trying to create a PE instance.", e);
+ logger.error("Problem when trying to create a PE instance for id {}", id, e);
}
return null;
}
@@ -544,8 +576,16 @@ public abstract class ProcessingElement implements Cloneable {
* Get all the local instances. See notes in {@link #getInstanceForKey(String) getLocalInstanceForKey}
*/
public Collection<ProcessingElement> getInstances() {
-
- return peInstances.asMap().values();
+ try {
+ if (isSingleton) {
+ return ImmutableList.of(peInstances.get(SINGLETON));
+ } else {
+ return peInstances.asMap().values();
+ }
+ } catch (ExecutionException e) {
+ logger.error("Problem when trying to create a PE instance for id {}", id, e);
+ return null;
+ }
}
/**
@@ -672,6 +712,64 @@ public abstract class ProcessingElement implements Cloneable {
app.peByName.put(name, this);
}
+ public byte[] serializeState() {
+ return getApp().getSerDeser().serialize(this);
+ }
+
+ public ProcessingElement deserializeState(byte[] loadedState) {
+ return (ProcessingElement) getApp().getSerDeser().deserialize(loadedState);
+ }
+
+ public void restoreState(ProcessingElement oldState) {
+ restoreFieldsForClass(oldState.getClass(), oldState);
+ }
+
+ protected void recover() {
+ byte[] serializedState = null;
+ try {
+ serializedState = getApp().getCheckpointingFramework().fetchSerializedState(new CheckpointId(this));
+ } catch (RuntimeException e) {
+ logger.error("Cannot fetch serialized stated for [{}/{}]: {}", new String[] {
+ getPrototype().getClass().getName(), getId(), e.getMessage() });
+ }
+ if (serializedState == null) {
+ return;
+ }
+ try {
+ ProcessingElement peInOldState = deserializeState(serializedState);
+ restoreState(peInOldState);
+ } catch (RuntimeException e) {
+ logger.error("Cannot restore state for key [" + new CheckpointId(this) + "]: " + e.getMessage(), e);
+ }
+ }
+
+ private void restoreFieldsForClass(Class<?> currentInOldStateClassHierarchy, ProcessingElement oldState) {
+ if (!ProcessingElement.class.isAssignableFrom(currentInOldStateClassHierarchy)) {
+ return;
+ } else {
+ Field[] fields = oldState.getClass().getDeclaredFields();
+ for (Field field : fields) {
+ if (!Modifier.isTransient(field.getModifiers()) && !Modifier.isStatic(field.getModifiers())) {
+ if (!Modifier.isPublic(field.getModifiers())) {
+ field.setAccessible(true);
+ }
+ try {
+ // TODO use reflectasm
+ field.set(this, field.get(oldState));
+ } catch (IllegalArgumentException e) {
+ logger.error("Cannot recover old state for this PE [{}]", e);
+ return;
+ } catch (IllegalAccessException e) {
+ logger.error("Cannot recover old state for this PE [{}]", e);
+ return;
+ }
+
+ }
+ }
+ restoreFieldsForClass(currentInOldStateClassHierarchy.getSuperclass(), oldState);
+ }
+ }
+
class Trigger {
final long intervalInMilliseconds;
final int intervalInEvents;
@@ -709,4 +807,41 @@ public abstract class ProcessingElement implements Cloneable {
return active;
}
}
+
+ public long getEventCount() {
+ return eventCount;
+ }
+
+ public boolean isDirty() {
+ return dirty;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(getClass().getName() + "/" + getId() + " ;");
+ if (isSingleton) {
+ sb.append("singleton ;");
+ }
+ sb.append(isThreadSafe ? "IS thread-safe ;" : "Not thread-safe ;");
+ sb.append("timerInterval=" + timerIntervalInMilliseconds + " ;");
+ return sb.toString();
+
+ }
+
+ public CheckpointingConfig getCheckpointingConfig() {
+ return checkpointingConfig;
+ }
+
+ public void setCheckpointingConfig(CheckpointingConfig checkpointingConfig) {
+ this.checkpointingConfig = checkpointingConfig;
+ }
+
+ /**
+ * By default, the state of the PE instance is considered dirty whenever it processed an event. Some event may
+ * actually leave the state of the PE unchanged. The dirty() method can therefore be overriden to accommodate user
+ * specific behaviors.
+ */
+ public void setDirty(boolean dirty) {
+ this.dirty = dirty;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index e01191b..b76102a 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -187,9 +187,8 @@ public class Stream<T extends Event> implements Runnable, Streamable {
.serialize(event)));
}
} catch (InterruptedException e) {
- e.printStackTrace();
logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
- System.exit(-1);
+ Thread.currentThread().interrupt();
}
}
@@ -201,9 +200,8 @@ public class Stream<T extends Event> implements Runnable, Streamable {
try {
queue.put(event);
} catch (InterruptedException e) {
- e.printStackTrace();
logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
- System.exit(-1);
+ Thread.currentThread().interrupt();
}
}
@@ -305,6 +303,7 @@ public class Stream<T extends Event> implements Runnable, Streamable {
} catch (InterruptedException e) {
logger.info("Closing stream {}.", name);
receiver.removeStream(this);
+ Thread.currentThread().interrupt();
return;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointId.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointId.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointId.java
new file mode 100644
index 0000000..290d46a
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointId.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * <p>
+ * Identifier of PEs. It is used to identify checkpointed PEs in the storage backend.
+ * </p>
+ * <p>
+ * The storage backend is responsible for converting this identifier to whatever internal representation is most
+ * suitable for it.
+ * </p>
+ * <p>
+ * This class provides methods for getting a compact String representation of the identifier and for creating an
+ * identifier from a String representation.
+ * </p>
+ *
+ */
+public class CheckpointId {
+
+ private String prototypeId;
+ private String key;
+
+ private static final Pattern STRING_REPRESENTATION_PATTERN = Pattern.compile("\\[(\\S*)\\];\\[(\\S*)\\]");
+
+ public CheckpointId() {
+ }
+
+ public CheckpointId(ProcessingElement pe) {
+ this.prototypeId = pe.getPrototype().getClass().getName();
+ this.key = pe.getId();
+ }
+
+ /**
+ *
+ * @param prototypeID
+ * id of the PE as returned by {@link ProcessingElement#getId() getId()} method
+ * @param key
+ * keyed attribute(s)
+ */
+ public CheckpointId(String prototypeID, String key) {
+ super();
+ this.prototypeId = prototypeID;
+ this.key = key;
+ }
+
+ public CheckpointId(String keyAsString) {
+ Matcher matcher = STRING_REPRESENTATION_PATTERN.matcher(keyAsString);
+
+ try {
+ matcher.find();
+ prototypeId = "".equals(matcher.group(1)) ? null : matcher.group(1);
+ key = "".equals(matcher.group(2)) ? null : matcher.group(2);
+ } catch (IndexOutOfBoundsException e) {
+
+ }
+
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getPrototypeId() {
+ return prototypeId;
+ }
+
+ public String toString() {
+ return "[PROTO_ID];[KEY] --> " + getStringRepresentation();
+ }
+
+ public String getStringRepresentation() {
+ return "[" + (prototypeId == null ? "" : prototypeId) + "];[" + (key == null ? "" : key) + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ return getStringRepresentation().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+
+ CheckpointId other = (CheckpointId) obj;
+ if (key == null) {
+ if (other.key != null)
+ return false;
+ } else if (!key.equals(other.key))
+ return false;
+ if (prototypeId == null) {
+ if (other.prototypeId != null)
+ return false;
+ } else if (!prototypeId.equals(other.prototypeId))
+ return false;
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingConfig.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingConfig.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingConfig.java
new file mode 100644
index 0000000..eec765d
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingConfig.java
@@ -0,0 +1,55 @@
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Checkpointing configuration: event count based vs time interval, frequency. User the {@link Builder} class to build
+ * instances.
+ *
+ */
+public class CheckpointingConfig {
+
+ /**
+ * Identifies the kind of checkpointing: time based, event count, or no checkpointing
+ *
+ */
+ public static enum CheckpointingMode {
+ TIME, EVENT_COUNT, NONE
+ }
+
+ public final CheckpointingMode mode;
+ public final int frequency;
+ public final TimeUnit timeUnit;
+
+ private CheckpointingConfig(CheckpointingMode mode, int frequency, TimeUnit timeUnit) {
+ this.mode = mode;
+ this.frequency = frequency;
+ this.timeUnit = timeUnit;
+ }
+
+ public static class Builder {
+ private CheckpointingMode mode;
+ private int frequency;
+ private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
+
+ public Builder(CheckpointingMode mode) {
+ this.mode = mode;
+ }
+
+ public Builder frequency(int frequency) {
+ this.frequency = frequency;
+ return this;
+ }
+
+ public Builder timeUnit(TimeUnit timeUnit) {
+ this.timeUnit = timeUnit;
+ return this;
+ }
+
+ public CheckpointingConfig build() {
+ return new CheckpointingConfig(mode, frequency, timeUnit);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingFramework.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingFramework.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingFramework.java
new file mode 100644
index 0000000..48b4616
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingFramework.java
@@ -0,0 +1,52 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ProcessingElement;
+
+import com.google.inject.ImplementedBy;
+
+/**
+ *
+ * This interface defines the functionalities offered by the checkpointing framework.
+ *
+ */
+@ImplementedBy(value = NoOpCheckpointingFramework.class)
+public interface CheckpointingFramework {
+
+ /**
+ * Serializes and stores state to the storage backend. Serialization and storage operations are asynchronous.
+ *
+ * @return a callback for getting notified of the result of the storage operation
+ */
+ StorageCallback saveState(ProcessingElement pe);
+
+ /**
+ * Fetches checkpoint data from storage for a given PE
+ *
+ * @param key
+ * safeKeeperId
+ * @return checkpoint data
+ */
+ byte[] fetchSerializedState(CheckpointId key);
+
+ /**
+ * Evaluates whether specified PE should be checkpointed, based on:
+ * <ul>
+ * <li>whether checkpointing enabled</li>
+ * <li>whether the pe is "dirty"</li>
+ * <li>the checkpointing frequency settings</li>
+ * </ul>
+ *
+ * This is used for count-based checkpointing intervals. Time-based checkpointing relies on the dirty flag when
+ * triggered.
+ *
+ * @param pe
+ * processing element to evaluate
+ * @return true if checkpointable, according to the above requirements
+ */
+ boolean isCheckpointable(ProcessingElement pe);
+
+ public enum StorageResultCode {
+ SUCCESS, FAILURE
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingTask.java
new file mode 100644
index 0000000..dd6003c
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingTask.java
@@ -0,0 +1,33 @@
+package org.apache.s4.core.ft;
+
+import java.util.Map;
+import java.util.TimerTask;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * When checkpointing at regular time intervals, this class is used to actually perform the checkpoints. It iterates
+ * among all instances of the specified prototype, and checkpoints every eligible instance.
+ *
+ */
+public class CheckpointingTask extends TimerTask {
+
+ ProcessingElement prototype;
+
+ public CheckpointingTask(ProcessingElement prototype) {
+ super();
+ this.prototype = prototype;
+ }
+
+ @Override
+ public void run() {
+ Map<String, ProcessingElement> peInstances = prototype.getPEInstances();
+ for (Map.Entry<String, ProcessingElement> entry : peInstances.entrySet()) {
+ synchronized (entry.getValue()) {
+ if (entry.getValue().isDirty()) {
+ entry.getValue().checkpoint();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
new file mode 100644
index 0000000..3a95da6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.codec.binary.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * <p>
+ * Implementation of a file system backend storage to persist checkpoints.
+ * </p>
+ * <p>
+ * The file system may be the default local file system when running on a single machine, but should be a distributed
+ * file system such as NFS when running on a cluster.
+ * </p>
+ * <p>
+ * Checkpoints are stored in individual files (1 file = 1 safeKeeperId) in directories according to the following
+ * structure: <code>(storageRootpath)/prototypeId/safeKeeperId</code>
+ * </p>
+ *
+ */
+public class DefaultFileSystemStateStorage implements StateStorage {
+
+ private static Logger logger = LoggerFactory.getLogger(DefaultFileSystemStateStorage.class);
+ @Inject(optional = true)
+ @Named("s4.checkpointing.filesystem.storageRootPath")
+ String storageRootPath;
+
+ public DefaultFileSystemStateStorage() {
+ }
+
+ /**
+ * <p>
+ * Called by the dependency injection framework, after construction.
+ * <p/>
+ */
+ @Inject
+ public void init() {
+ checkStorageDir();
+ }
+
+ @Override
+ public byte[] fetchState(CheckpointId key) {
+ File file = safeKeeperID2File(key, storageRootPath);
+ if (file != null && file.exists()) {
+ logger.debug("Fetching " + file.getAbsolutePath() + "for : " + key);
+
+ try {
+ return Files.toByteArray(file);
+ } catch (IOException e) {
+ logger.error("Cannot read content from checkpoint file [" + file.getAbsolutePath() + "]", e);
+ return null;
+ }
+ } else {
+ return null;
+ }
+
+ }
+
+ @Override
+ public Set<CheckpointId> fetchStoredKeys() {
+ Set<CheckpointId> keys = new HashSet<CheckpointId>();
+ File rootDir = new File(storageRootPath);
+ File[] dirs = rootDir.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return file.isDirectory();
+ }
+ });
+ for (File dir : dirs) {
+ File[] files = dir.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return (file.isFile());
+ }
+ });
+ for (File file : files) {
+ keys.add(file2SafeKeeperID(file));
+ }
+ }
+ return keys;
+ }
+
+ // files kept as : root/<prototypeId>/encodedKeyWithFullInfo
+ private static File safeKeeperID2File(CheckpointId key, String storageRootPath) {
+
+ return new File(storageRootPath + File.separator + key.getPrototypeId() + File.separator
+ + Base64.encodeBase64URLSafeString(key.getStringRepresentation().getBytes()));
+ }
+
+ private static CheckpointId file2SafeKeeperID(File file) {
+ CheckpointId id = null;
+ id = new CheckpointId(new String(Base64.decodeBase64(file.getName())));
+ return id;
+ }
+
+ public void checkStorageDir() {
+ if (storageRootPath == null) {
+
+ File defaultStorageDir = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator
+ + "storage");
+ storageRootPath = defaultStorageDir.getAbsolutePath();
+ logger.warn("Unspecified storage dir; using default dir: {}", defaultStorageDir.getAbsolutePath());
+ if (!defaultStorageDir.exists()) {
+ if (!(defaultStorageDir.mkdirs())) {
+ logger.error("Storage directory not specified, and cannot create default storage directory : "
+ + defaultStorageDir.getAbsolutePath() + "\n Checkpointing and recovery will be disabled.");
+ }
+ }
+ }
+ }
+
+ @Override
+ public void saveState(CheckpointId key, byte[] state, StorageCallback callback) {
+ File f = safeKeeperID2File(key, storageRootPath);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Checkpointing [" + key + "] into file: [" + f.getAbsolutePath() + "]");
+ }
+ if (!f.exists()) {
+ if (!f.getParentFile().exists()) {
+ // parent file has prototype id
+ if (!f.getParentFile().mkdirs()) {
+ callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE,
+ "Cannot create directory for storing PE [" + key.toString() + "] for prototype: "
+ + f.getParentFile().getAbsolutePath());
+ return;
+ }
+ }
+ } else {
+ if (!f.delete()) {
+ callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE,
+ "Cannot delete previously saved checkpoint file [" + f.getParentFile().getAbsolutePath() + "]");
+ return;
+ }
+ }
+
+ try {
+ Files.write(state, f);
+ callback.storageOperationResult(CheckpointingFramework.StorageResultCode.SUCCESS, key.toString());
+ } catch (FileNotFoundException e) {
+ logger.error("Cannot write checkpoint file [" + f.getAbsolutePath() + "]", e);
+ callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE, key.toString() + " : "
+ + e.getMessage());
+ } catch (IOException e) {
+ logger.error("Cannot write checkpoint file [" + f.getAbsolutePath() + "]", e);
+ callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE, key.toString() + " : "
+ + e.getMessage());
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FetchTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FetchTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FetchTask.java
new file mode 100644
index 0000000..6994acd
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FetchTask.java
@@ -0,0 +1,36 @@
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.Callable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates a checkpoint fetching operation.
+ *
+ */
+public class FetchTask implements Callable<byte[]> {
+
+ private static Logger logger = LoggerFactory.getLogger(FetchTask.class);
+
+ StateStorage stateStorage;
+ CheckpointId checkpointId;
+
+ public FetchTask(StateStorage stateStorage, CheckpointId checkpointId) {
+ super();
+ this.stateStorage = stateStorage;
+ this.checkpointId = checkpointId;
+ }
+
+ @Override
+ public byte[] call() throws Exception {
+ try {
+ byte[] result = stateStorage.fetchState(checkpointId);
+ return result;
+ } catch (Exception e) {
+ logger.error("Cannot fetch checkpoint data for {}", checkpointId, e);
+ throw e;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
new file mode 100644
index 0000000..dd984e6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
@@ -0,0 +1,15 @@
+package org.apache.s4.core.ft;
+
+import com.google.inject.AbstractModule;
+
+/**
+ * Checkpointing module that uses the {@link DefaultFileSystemStateStorage} as a checkpointing backend.
+ *
+ */
+public class FileSystemBackendCheckpointingModule extends AbstractModule {
+ @Override
+ protected void configure() {
+ bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
+ bind(CheckpointingFramework.class).to(SafeKeeper.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/LoggingStorageCallbackFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/LoggingStorageCallbackFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/LoggingStorageCallbackFactory.java
new file mode 100644
index 0000000..28279ba
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/LoggingStorageCallbackFactory.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import org.apache.log4j.Logger;
+import org.apache.s4.core.ft.CheckpointingFramework.StorageResultCode;
+
+/**
+ * A factory for creating storage callbacks that simply log callback results
+ *
+ */
+public class LoggingStorageCallbackFactory implements StorageCallbackFactory {
+
+ @Override
+ public StorageCallback createStorageCallback() {
+ return new StorageCallbackLogger();
+ }
+
+ /**
+ * A basic storage callback that simply logs results from storage operations
+ *
+ */
+ static class StorageCallbackLogger implements StorageCallback {
+
+ private static Logger logger = Logger.getLogger("s4-ft");
+
+ @Override
+ public void storageOperationResult(StorageResultCode code, Object message) {
+ if (StorageResultCode.SUCCESS == code) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Callback from storage: " + StorageResultCode.SUCCESS.name() + " : " + message);
+ }
+ } else {
+ logger.warn("Callback from storage: " + StorageResultCode.FAILURE.name() + " : " + message);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/NoOpCheckpointingFramework.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/NoOpCheckpointingFramework.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/NoOpCheckpointingFramework.java
new file mode 100644
index 0000000..10387ce
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/NoOpCheckpointingFramework.java
@@ -0,0 +1,26 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * Implementation of {@link CheckpointingFramework} that does NO checkpointing.
+ *
+ */
+public final class NoOpCheckpointingFramework implements CheckpointingFramework {
+
+ @Override
+ public StorageCallback saveState(ProcessingElement pe) {
+ return null;
+ }
+
+ @Override
+ public byte[] fetchSerializedState(CheckpointId key) {
+ return null;
+ }
+
+ @Override
+ public boolean isCheckpointable(ProcessingElement pe) {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
new file mode 100644
index 0000000..4c8358f
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ *
+ * <p>
+ * This class is responsible for coordinating interactions between the S4 event processor and the checkpoint storage
+ * backend. In particular, it schedules asynchronous save tasks to be executed on the backend.
+ * </p>
+ *
+ *
+ *
+ */
+public final class SafeKeeper implements CheckpointingFramework {
+
+ private static final class UncaughtExceptionLogger implements UncaughtExceptionHandler {
+ String operationType;
+
+ public UncaughtExceptionLogger(String operationType) {
+ this.operationType = operationType;
+ }
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ logger.error("Cannot execute checkpointing " + operationType + " operation", e);
+ }
+ }
+
+ private static Logger logger = LoggerFactory.getLogger(SafeKeeper.class);
+
+ @Inject
+ private StateStorage stateStorage;
+ @Inject(optional = true)
+ private StorageCallbackFactory storageCallbackFactory = new LoggingStorageCallbackFactory();
+
+ private ThreadPoolExecutor storageThreadPool;
+ private ThreadPoolExecutor serializationThreadPool;
+ private ThreadPoolExecutor fetchingThreadPool;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.storageMaxThreads")
+ int storageMaxThreads = 1;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.storageThreadKeepAliveSeconds")
+ int storageThreadKeepAliveSeconds = 120;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.storageMaxOutstandingRequests")
+ int storageMaxOutstandingRequests = 1000;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.serializationMaxThreads")
+ int serializationMaxThreads = 1;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.serializationThreadKeepAliveSeconds")
+ int serializationThreadKeepAliveSeconds = 120;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.serializationMaxOutstandingRequests")
+ int serializationMaxOutstandingRequests = 1000;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.maxSerializationLockTime")
+ long maxSerializationLockTime = 1000;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.fetchingMaxThreads")
+ int fetchingMaxThreads = 1;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.fetchingThreadKeepAliveSeconds")
+ int fetchingThreadKeepAliveSeconds = 120;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.fetchingMaxWaitMs")
+ long fetchingMaxWaitMs = 1000;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.fetchingMaxConsecutiveFailuresBeforeDisabling")
+ int fetchingMaxConsecutiveFailuresBeforeDisabling = 10;
+
+ @Inject(optional = true)
+ @Named("s4.checkpointing.fetchingDisabledDurationMs")
+ long fetchingDisabledDurationMs = 600000;
+
+ long fetchingDisabledInitTime = -1;
+ AtomicInteger fetchingCurrentConsecutiveFailures = new AtomicInteger();
+
+ public SafeKeeper() {
+ }
+
+ @Inject
+ private void init() {
+
+ // NOTE: those thread pools should be fine tuned according to backend and application load/requirements.
+ // For now:
+ // - number of threads and work queue size have overridable defaults
+ // - failures are logged
+ // - when storage queue is full, we throttle backwards to the serialization threadpool
+ // - when serialization queue is full, we abort execution for new entries
+ // - fetching uses a synchronous queue and therefore is a blocking operation, with a timeout
+
+ ThreadFactory storageThreadFactory = new ThreadFactoryBuilder().setNameFormat("Checkpointing-storage-%d")
+ .setUncaughtExceptionHandler(new UncaughtExceptionLogger("storage")).build();
+ storageThreadPool = new ThreadPoolExecutor(1, storageMaxThreads, storageThreadKeepAliveSeconds,
+ TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(storageMaxOutstandingRequests),
+ storageThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
+ storageThreadPool.allowCoreThreadTimeOut(true);
+
+ ThreadFactory serializationThreadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("Checkpointing-serialization-%d")
+ .setUncaughtExceptionHandler(new UncaughtExceptionLogger("serialization")).build();
+ serializationThreadPool = new ThreadPoolExecutor(1, serializationMaxThreads,
+ serializationThreadKeepAliveSeconds, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(
+ serializationMaxOutstandingRequests), serializationThreadFactory,
+ new ThreadPoolExecutor.AbortPolicy());
+ serializationThreadPool.allowCoreThreadTimeOut(true);
+
+ ThreadFactory fetchingThreadFactory = new ThreadFactoryBuilder().setNameFormat("Checkpointing-fetching-%d")
+ .setUncaughtExceptionHandler(new UncaughtExceptionLogger("fetching")).build();
+ fetchingThreadPool = new ThreadPoolExecutor(1, fetchingMaxThreads, fetchingThreadKeepAliveSeconds,
+ TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true), fetchingThreadFactory);
+ fetchingThreadPool.allowCoreThreadTimeOut(true);
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.core.ft.CheckpointingFramework#saveState(org.apache.s4.core.ProcessingElement)
+ */
+ @Override
+ public StorageCallback saveState(ProcessingElement pe) {
+ StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
+ Future<byte[]> futureSerializedState = null;
+ try {
+ futureSerializedState = serializeState(pe);
+ } catch (RejectedExecutionException e) {
+ // if (monitor != null) {
+ // monitor.increment(MetricsName.checkpointing_dropped_from_serialization_queue.toString(), 1,
+ // S4_CORE_METRICS.toString());
+ // }
+ storageCallback.storageOperationResult(StorageResultCode.FAILURE,
+ "Serialization task queue is full. An older serialization task was dumped in order to serialize PE ["
+ + pe.getId() + "]" + " Remaining capacity for the serialization task queue is ["
+ + serializationThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+ + serializationThreadPool.getQueue().size() + "] ; maximum capacity is ["
+ + serializationThreadPool + "]");
+ return storageCallback;
+ }
+ submitSaveStateTask(new SaveStateTask(new CheckpointId(pe), futureSerializedState, storageCallback,
+ stateStorage), storageCallback);
+ return storageCallback;
+ }
+
+ private Future<byte[]> serializeState(ProcessingElement pe) {
+ Future<byte[]> future = serializationThreadPool.submit(new SerializeTask(pe));
+ // if (monitor != null) {
+ // monitor.increment(MetricsName.checkpointing_added_to_serialization_queue.toString(), 1,
+ // S4_CORE_METRICS.toString());
+ // }
+ return future;
+ }
+
+ private void submitSaveStateTask(SaveStateTask task, StorageCallback storageCallback) {
+ try {
+ storageThreadPool.execute(task);
+ // if (monitor != null) {
+ // monitor.increment(MetricsName.checkpointing_added_to_storage_queue.toString(), 1);
+ // }
+ } catch (RejectedExecutionException e) {
+ // if (monitor != null) {
+ // monitor.increment(MetricsName.checkpointing_dropped_from_storage_queue.toString(), 1);
+ // }
+ storageCallback.storageOperationResult(StorageResultCode.FAILURE,
+ "Storage checkpoint queue is full. Removed an old task to handle latest task. Remaining capacity for task queue is ["
+ + storageThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+ + storageThreadPool.getQueue().size() + "] ; maximum capacity is ["
+ + storageMaxOutstandingRequests + "]");
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.core.ft.CheckpointingFramework#fetchSerializedState(org.apache.s4.core.ft.SafeKeeperId)
+ */
+ @Override
+ public byte[] fetchSerializedState(CheckpointId key) {
+
+ byte[] result = null;
+
+ if (fetchingCurrentConsecutiveFailures.get() == fetchingMaxConsecutiveFailuresBeforeDisabling) {
+ if ((fetchingDisabledInitTime + fetchingDisabledDurationMs) < System.currentTimeMillis()) {
+ return null;
+ } else {
+ // reached time, reinit
+ fetchingCurrentConsecutiveFailures.set(0);
+ }
+ }
+ Future<byte[]> fetched = fetchingThreadPool.submit(new FetchTask(stateStorage, key));
+ try {
+ result = fetched.get(fetchingMaxWaitMs, TimeUnit.MILLISECONDS);
+ fetchingCurrentConsecutiveFailures.set(0);
+ return result;
+ } catch (TimeoutException te) {
+ logger.error("Cannot fetch checkpoint from backend for key [{}] before timeout of {} ms",
+ key.getStringRepresentation(), fetchingMaxWaitMs);
+ } catch (InterruptedException e) {
+ logger.error(
+ "Cannot fetch checkpoint from backend for key [{}] before timeout of {} ms because of an interruption",
+ key.getStringRepresentation(), fetchingMaxWaitMs);
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ logger.error("Cannot fetch checkpoint from backend for key [{}] due to {}", key.getStringRepresentation(),
+ e.getCause().getClass().getName() + "/" + e.getCause().getMessage());
+ }
+ if (fetchingCurrentConsecutiveFailures.incrementAndGet() == fetchingMaxConsecutiveFailuresBeforeDisabling) {
+ logger.trace(
+ "Due to {} successive checkpoint fetching failures, fetching is temporarily disabled for {} ms",
+ fetchingMaxConsecutiveFailuresBeforeDisabling, fetchingDisabledDurationMs);
+ fetchingDisabledInitTime = System.currentTimeMillis();
+ }
+
+ return result;
+ }
+
+ @Override
+ public boolean isCheckpointable(ProcessingElement pe) {
+ if (pe.getCheckpointingConfig().mode.equals(CheckpointingConfig.CheckpointingMode.NONE)) {
+ return false;
+ }
+ if (pe.getCheckpointingConfig().frequency > 0 && pe.isDirty()) {
+ if (pe.getCheckpointingConfig().mode.equals(CheckpointingConfig.CheckpointingMode.EVENT_COUNT)) {
+ if (pe.getEventCount() % pe.getCheckpointingConfig().frequency == 0) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SaveStateTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SaveStateTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SaveStateTask.java
new file mode 100644
index 0000000..11c5af3
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SaveStateTask.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * Encapsulates a checkpoint request. It is scheduled by the checkpointing framework.
+ *
+ */
+public class SaveStateTask implements Runnable {
+
+ private static Logger logger = LoggerFactory.getLogger(SaveStateTask.class);
+
+ CheckpointId safeKeeperId;
+ byte[] serializedState;
+ Future<byte[]> futureSerializedState = null;
+ StorageCallback storageCallback;
+ StateStorage stateStorage;
+
+ public SaveStateTask(CheckpointId safeKeeperId, byte[] state, StorageCallback storageCallback,
+ StateStorage stateStorage) {
+ super();
+ this.safeKeeperId = safeKeeperId;
+ this.serializedState = state;
+ this.storageCallback = storageCallback;
+ this.stateStorage = stateStorage;
+ }
+
+ public SaveStateTask(CheckpointId safeKeeperId, Future<byte[]> futureSerializedState,
+ StorageCallback storageCallback, StateStorage stateStorage) {
+ this.safeKeeperId = safeKeeperId;
+ this.futureSerializedState = futureSerializedState;
+ this.storageCallback = storageCallback;
+ this.stateStorage = stateStorage;
+ }
+
+ @Override
+ public void run() {
+ if (futureSerializedState != null) {
+ try {
+ stateStorage.saveState(safeKeeperId, futureSerializedState.get(1000, TimeUnit.MILLISECONDS),
+ storageCallback);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ logger.warn("Cannot save checkpoint : " + safeKeeperId, e);
+ } catch (TimeoutException e) {
+ logger.warn("Cannot save checkpoint {} : could not serialize before timeout", safeKeeperId);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SerializeTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SerializeTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SerializeTask.java
new file mode 100644
index 0000000..97619b5
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SerializeTask.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.Callable;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * Encaspulate a PE serialization operation. This operation locks the PE instance in order to avoid any inconsistent
+ * serialized state. If serialization is successful, the PE is marked as "not dirty".
+ *
+ */
+public class SerializeTask implements Callable<byte[]> {
+
+ ProcessingElement pe;
+
+ public SerializeTask(ProcessingElement pe) {
+ super();
+ this.pe = pe;
+ }
+
+ @Override
+ public byte[] call() throws Exception {
+ synchronized (pe) {
+ byte[] state = pe.serializeState();
+ pe.setDirty(false);
+ return state;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StateStorage.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StateStorage.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StateStorage.java
new file mode 100644
index 0000000..cbe2bda
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StateStorage.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.Set;
+
+import com.google.inject.Inject;
+
+/**
+ * <p>
+ * Defines the methods that must be implemented by a backend storage for checkpoints.
+ * </p>
+ *
+ * NOTE: the backend implementation usually needs some kind of initialization. The recommended place to do this is in a
+ * custom method annotated with {@link Inject} annotation, which will be called after the instance is constructed.
+ *
+ */
+public interface StateStorage {
+
+ /**
+ * Stores a checkpoint.
+ *
+ * <p>
+ * NOTE: we don't handle any failure/success return value, because all failure/success notifications go through the
+ * StorageCallback reference
+ * </p>
+ *
+ * @param key
+ * safeKeeperId
+ * @param state
+ * checkpoint data as a byte array
+ * @param callback
+ * callback for receiving notifications of storage operations. This callback is configurable
+ */
+ public void saveState(CheckpointId key, byte[] state, StorageCallback callback);
+
+ /**
+ * Fetches data for a stored checkpoint.
+ * <p>
+ * Must return null if storage does not contain this key.
+ * </p>
+ *
+ * @param key
+ * safeKeeperId for this checkpoint
+ *
+ * @return stored checkpoint data, or null if the storage does not contain data for the given key
+ */
+ public byte[] fetchState(CheckpointId key);
+
+ /**
+ * Fetches all stored safeKeeper Ids.
+ *
+ * @return all stored safeKeeper Ids.
+ */
+ public Set<CheckpointId> fetchStoredKeys();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallback.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallback.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallback.java
new file mode 100644
index 0000000..6676dd0
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallback.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+/**
+ *
+ * Callback for reporting the result of an asynchronous storage operation
+ *
+ */
+public interface StorageCallback {
+
+ /**
+ * Notifies the result of a storage operation
+ *
+ * @param resultCode
+ * code for the result : {@link SafeKeeper.StorageResultCode SafeKeeper.StorageResultCode}
+ * @param message
+ * whatever message object is suitable
+ */
+ public void storageOperationResult(CheckpointingFramework.StorageResultCode resultCode, Object message);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallbackFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallbackFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallbackFactory.java
new file mode 100644
index 0000000..c448399
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallbackFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+/**
+ * A factory for creating storage callbacks. Storage callback implementations
+ * that can take specific actions upon success or failure of asynchronous
+ * storage operations.
+ *
+ */
+public interface StorageCallbackFactory {
+
+ /**
+ * Factory method
+ *
+ * @return returns a StorageCallback instance
+ */
+ public StorageCallback createStorageCallback();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index 12f5d2f..70fe4ec 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@ -6,16 +6,17 @@ import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
-import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.comm.BareCommModule;
import org.apache.s4.core.triggers.TriggeredApp;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.s4.wordcount.StringEvent;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.junit.After;
-import com.google.common.io.Resources;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -45,9 +46,7 @@ public abstract class TriggerTest extends ZkBasedTest {
protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
final ZooKeeper zk = CommTestUtils.createZkClient();
- Injector injector = Guice.createInjector(
- new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
- new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+ Injector injector = Guice.createInjector(new BareCommModule(), new BareCoreModule());
app = injector.getInstance(TriggeredApp.class);
app.init();
app.start();
@@ -61,7 +60,7 @@ public abstract class TriggerTest extends ZkBasedTest {
CountDownLatch signalEvent1Triggered = new CountDownLatch(1);
CommTestUtils.watchAndSignalCreation("/onTrigger[StringEvent]@" + time1, signalEvent1Triggered, zk);
- CommTestUtils.injectIntoStringSocketAdapter(time1);
+ app.stream.receiveEvent(new EventMessage("-1", "stream", app.getSerDeser().serialize(new StringEvent(time1))));
// check event processed
Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
@@ -69,5 +68,4 @@ public abstract class TriggerTest extends ZkBasedTest {
// return latch on trigger signal
return signalEvent1Triggered;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingModuleWithUnrespondingFetchingStorageBackend.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingModuleWithUnrespondingFetchingStorageBackend.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingModuleWithUnrespondingFetchingStorageBackend.java
new file mode 100644
index 0000000..431e31e
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingModuleWithUnrespondingFetchingStorageBackend.java
@@ -0,0 +1,20 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ft.FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.DummyZKStorageCallbackFactory;
+import org.apache.s4.fixtures.CommTestUtils;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+
+public class CheckpointingModuleWithUnrespondingFetchingStorageBackend extends AbstractModule {
+
+ @Override
+ protected void configure() {
+ bind(String.class).annotatedWith(Names.named("s4.checkpointing.filesystem.storageRootPath")).toInstance(
+ CommTestUtils.DEFAULT_STORAGE_DIR.getAbsolutePath());
+ bind(StateStorage.class).to(StorageWithUnrespondingFetching.class);
+ bind(CheckpointingFramework.class).to(SafeKeeper.class);
+ bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);
+ }
+
+}