You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/31 09:27:19 UTC
git commit: provide optional recovery after expiry from persister
cache
Updated Branches:
refs/heads/S4-42 [created] b43de357d
provide optional recovery after expiry from persister cache
- also allow injection of persister through persister factory
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/b43de357
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/b43de357
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/b43de357
Branch: refs/heads/S4-42
Commit: b43de357d49711fc319c46865fdb08f67414d4f8
Parents: 45efb82
Author: Matthieu Morel <mm...@apache.org>
Authored: Tue Jan 31 10:03:22 2012 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Jan 31 10:24:53 2012 +0100
----------------------------------------------------------------------
.../src/main/java/org/apache/s4/ft/SafeKeeper.java | 28 ++
.../org/apache/s4/persist/ConMapPersister.java | 4 +
.../org/apache/s4/persist/HashMapPersister.java | 4 +
.../java/org/apache/s4/processor/AbstractPE.java | 31 ++-
.../org/apache/s4/processor/PrototypeWrapper.java | 4 -
.../java/org/apache/s4/ft/CheckpointingTest.java | 3 +-
.../test/java/org/apache/s4/ft/RecoveryTest.java | 4 +-
s4-core/src/test/java/org/apache/s4/ft/S4App.java | 25 ++-
.../src/test/java/org/apache/s4/ft/TestUtils.java | 4 +-
.../java/org/apache/s4/ft/pecache/CacheTestPE.java | 60 +++++
.../TestRecoveryInteractionWithPECache.java | 101 ++++++++
.../pecache/app_conf_noRecoveryAfterExpiration.xml | 18 ++
.../pecache/app_conf_recoveryAfterExpiration.xml | 18 ++
.../s4/ft/pecache/s4_core_conf_fs_backend.xml | 196 +++++++++++++++
.../java/org/apache/s4/ft/pecache/wall_clock.xml | 6 +
.../apache/s4/ft/wordcount/FTWordCountTest.java | 6 +-
.../apache/s4/processor/TestPrototypeWrapper.java | 2 +-
.../org/apache/s4/wordcount/WordCountTest.java | 2 +-
18 files changed, 491 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
index f47fe44..88e0b94 100644
--- a/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
+++ b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
@@ -22,9 +22,12 @@ import org.apache.s4.dispatcher.partitioner.Hasher;
import org.apache.s4.emitter.CommLayerEmitter;
import org.apache.s4.processor.AbstractPE;
import org.apache.s4.serialize.SerializerDeserializer;
+import org.apache.s4.util.clock.Clock;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
@@ -256,4 +259,29 @@ public class SafeKeeper {
this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
}
+ /**
+ * Recovery depends on expiration settings:
+ * a PE may or may not recover its previous state if it was expired, depending on the "recoveryAfterExpiration" setting.
+ * @param abstractPE TODO
+ * @param pe TODO
+ *
+ */
+ public boolean mustRestoreState(AbstractPE pe, int ttl, Clock clock) {
+ if (pe.isRecoveryAfterExpiration()) {
+ return true;
+ } else {
+ // NOTE : ttl is not checkpointed. Get the value from current prototype
+ if (pe.getCacheAddDate()!=-1 && ttl!=-1 &&
+ (pe.getCacheAddDate() + (1000 * ttl)) <= clock.getCurrentTime()
+ ) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Not recovering PE ["+ pe.getSafeKeeperId() + "] because it was expired");
+ }
+ return false;
+ } else {
+ return true;
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java b/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java
index 8b3f529..7c09c12 100644
--- a/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java
+++ b/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java
@@ -17,6 +17,7 @@
*/
package org.apache.s4.persist;
+import org.apache.s4.processor.AbstractPE;
import org.apache.s4.util.clock.Clock;
import java.util.Enumeration;
@@ -121,6 +122,9 @@ public class ConMapPersister implements Persister {
ce.value = value;
ce.period = period;
ce.addTime = s4Clock.getCurrentTime();
+ if (value instanceof AbstractPE) {
+ ((AbstractPE)value).setCacheAddDate(ce.addTime);
+ }
cache.put(key, ce);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java b/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java
index 53e8380..f766d9e 100644
--- a/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java
+++ b/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java
@@ -17,6 +17,7 @@
*/
package org.apache.s4.persist;
+import org.apache.s4.processor.AbstractPE;
import org.apache.s4.util.clock.Clock;
import java.util.ArrayList;
@@ -123,6 +124,9 @@ public class HashMapPersister implements Persister {
ce.value = value;
ce.period = period;
ce.addTime = s4Clock.getCurrentTime();
+ if (value instanceof AbstractPE) {
+ ((AbstractPE)value).setCacheAddDate(ce.addTime);
+ }
cache.put(key, ce);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java b/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
index 58757d3..65a4fdd 100644
--- a/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
+++ b/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
@@ -83,7 +83,7 @@ public abstract class AbstractPE implements Cloneable {
}
}
- transient private Clock clock;
+ private transient Clock clock;
// FIXME replaces monitor wait on AbstractPE, for triggering possible extra
// thread when checkpointing activated
transient private CountDownLatch s4ClockSetSignal = new CountDownLatch(1);
@@ -120,6 +120,8 @@ public abstract class AbstractPE implements Cloneable {
transient private int checkpointableEventCount = 0;
transient private int checkpointsBeforePause = -1;
transient private long checkpointingPauseTimeInMillis;
+ private boolean isRecoveryAfterExpiration;
+ private long cacheAddDate = -1;
transient private OverloadDispatcher overloadDispatcher;
@@ -209,7 +211,7 @@ public abstract class AbstractPE implements Cloneable {
// initialize checkpointing event flag
this.isCheckpointingEvent = false;
if (!recoveryAttempted) {
- recover();
+ recover();
recoveryAttempted = true;
}
}
@@ -562,7 +564,9 @@ public abstract class AbstractPE implements Cloneable {
}
try {
AbstractPE peInOldState = deserializeState(serializedState);
- restoreState(peInOldState);
+ if (safeKeeper.mustRestoreState(peInOldState, ttl, clock)) {
+ restoreState(peInOldState);
+ }
} catch (RuntimeException e) {
Logger.getLogger("s4-ft").error("Cannot restore state for key [" + getSafeKeeperId().toString()+"]: "+e.getMessage(), e);
}
@@ -578,6 +582,10 @@ public abstract class AbstractPE implements Cloneable {
this.safeKeeperSetSignal.countDown();
}
}
+
+ public SafeKeeper getSafeKeeper() {
+ return this.safeKeeper;
+ }
public final void processEvent(InitiateCheckpointingEvent checkpointingEvent) {
isCheckpointingEvent = true;
@@ -777,4 +785,21 @@ public abstract class AbstractPE implements Cloneable {
}
}
+
+ public void setCacheAddDate(long cacheAddDate) {
+ this.cacheAddDate = cacheAddDate;
+ }
+
+ public long getCacheAddDate() {
+ return cacheAddDate;
+ }
+
+ public boolean isRecoveryAfterExpiration() {
+ return isRecoveryAfterExpiration;
+ }
+
+ public void setRecoveryAfterExpiration(boolean isRecoveryAfterExpiration) {
+ this.isRecoveryAfterExpiration = isRecoveryAfterExpiration;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/main/java/org/apache/s4/processor/PrototypeWrapper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/PrototypeWrapper.java b/s4-core/src/main/java/org/apache/s4/processor/PrototypeWrapper.java
index 21260ed..c233c5f 100644
--- a/s4-core/src/main/java/org/apache/s4/processor/PrototypeWrapper.java
+++ b/s4-core/src/main/java/org/apache/s4/processor/PrototypeWrapper.java
@@ -31,7 +31,6 @@ public class PrototypeWrapper {
private static Logger logger = Logger.getLogger(PrototypeWrapper.class);
private AbstractPE prototype;
Persister lookupTable;
- SafeKeeper safeKeeper;
public String getId() {
return prototype.getId();
@@ -125,7 +124,4 @@ public class PrototypeWrapper {
return prototype.advise();
}
- public void setSafeKeeper(SafeKeeper safeKeeper) {
- this.safeKeeper = safeKeeper;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java b/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java
index 37f2a9a..3fd70a7 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java
@@ -31,7 +31,7 @@ public class CheckpointingTest extends S4TestCase {
public void prepare() throws Exception {
zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
app = new S4App(getClass(), "s4_core_conf_fs_backend.xml");
- app.initializeS4App();
+ app.initializeS4App("app_conf.xml");
}
@After
@@ -101,6 +101,7 @@ public class CheckpointingTest extends S4TestCase {
keyValueStringField.setAccessible(true);
keyValueStringField.set(refPE, "value");
refPE.setId("statefulPE");
+ refPE.setCacheAddDate(pe.getCacheAddDate());
refPE.setKeys(new String[] {});
KryoSerDeser kryoSerDeser = new KryoSerDeser();
byte[] refBytes = kryoSerDeser.serialize(refPE);
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java b/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
index 1ca5652..779f1b0 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
@@ -51,7 +51,7 @@ public class RecoveryTest extends S4TestCase {
final ZooKeeper zk = TestUtils.createZkClient();
// 1. instantiate remote S4 app
forkedS4App = TestUtils.forkS4App(getClass().getName(),
- "s4_core_conf_fs_backend.xml");
+ "s4_core_conf_fs_backend.xml", "app_conf.xml");
// TODO synchro
Thread.sleep(4000);
@@ -93,7 +93,7 @@ public class RecoveryTest extends S4TestCase {
StatefulTestPE.DATA_FILE.delete();
forkedS4App = TestUtils.forkS4App(getClass().getName(),
- "s4_core_conf_fs_backend.xml");
+ "s4_core_conf_fs_backend.xml", "app_conf.xml");
// TODO synchro
Thread.sleep(2000);
// trigger recovery by sending application event to set value 2
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/S4App.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/S4App.java b/s4-core/src/test/java/org/apache/s4/ft/S4App.java
index bd4a6df..42d46e4 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/S4App.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/S4App.java
@@ -59,19 +59,23 @@ public class S4App {
* @param args
* @throws Exception
*/
- public static void main(String[] args) throws Exception {
- Class testClass = Class.forName(args[0]);
- String s4CoreConfFile = args[1];
- S4App app = new S4App(testClass, s4CoreConfFile);
- S4TestCase.initS4Parameters();
- app.initializeS4App();
-
+ public static void main(String[] args) {
+ try {
+ Class testClass = Class.forName(args[0]);
+ String s4CoreConfFile = args[1];
+ String s4AppConfFile = args[2];
+ S4App app = new S4App(testClass, s4CoreConfFile);
+ S4TestCase.initS4Parameters();
+ app.initializeS4App(s4AppConfFile);
+ } catch (Exception e) {
+ Logger.getLogger(S4App.class).error("Cannot start S4 app", e);
+ }
}
/**
* Performs dependency injection and starts the S4 plaftform.
*/
- public void initializeS4App()
+ public void initializeS4App(String s4AppConfFile)
throws Exception {
initConfigPaths(testClass, s4CoreConfFileName);
ApplicationContext coreContext = null;
@@ -93,6 +97,7 @@ public class S4App {
Watcher w = (Watcher) context.getBean("watcher");
w.setConfigFilename(configBase + s4CoreConfFileName);
+ Logger.getLogger(getClass()).info("initializing app");
// load extension modules
// String[] configFileNames = getModuleConfigFiles(extsHome, prop);
// if (configFileNames.length > 0) {
@@ -105,7 +110,8 @@ public class S4App {
// }
// load application modules
- String applicationConfigFileName = configBase + "app_conf.xml";
+// String applicationConfigFileName = configBase + "app_conf.xml";
+ String applicationConfigFileName = configBase + s4AppConfFile;
String[] configFileUrls = new String[] { "file:"
+ applicationConfigFileName };
context = new FileSystemXmlApplicationContext(configFileUrls, context);
@@ -128,6 +134,7 @@ public class S4App {
}
}
((AbstractPE)bean).setSafeKeeper((SafeKeeper) context.getBean("safeKeeper"));
+
} catch (NoSuchMethodException mnfe) {
// acceptable
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java b/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
index 84af34f..225c74d 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
@@ -35,7 +35,8 @@ import org.apache.zookeeper.server.ZooKeeperServer;
public class TestUtils {
public static final int ZK_PORT = 21810;
- public static Process forkS4App(String testClassName, String s4CoreConfFileName) throws IOException,
+
+ public static Process forkS4App(String testClassName, String s4CoreConfFileName, String s4AppConfFileName) throws IOException,
InterruptedException {
List<String> cmdList = new ArrayList<String>();
cmdList.add("java");
@@ -53,6 +54,7 @@ public class TestUtils {
cmdList.add(S4App.class.getName());
cmdList.add(testClassName);
cmdList.add(s4CoreConfFileName);
+ cmdList.add(s4AppConfFileName);
ProcessBuilder pb = new ProcessBuilder(cmdList);
pb.directory(new File(System.getProperty("user.dir")));
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/pecache/CacheTestPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/pecache/CacheTestPE.java b/s4-core/src/test/java/org/apache/s4/ft/pecache/CacheTestPE.java
new file mode 100644
index 0000000..5f4d35f
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/pecache/CacheTestPE.java
@@ -0,0 +1,60 @@
+package org.apache.s4.ft.pecache;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.ft.TestUtils;
+import org.apache.s4.processor.AbstractPE;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+public class CacheTestPE extends AbstractPE implements Watcher {
+
+ String value = "";
+ transient ZooKeeper zk = null;
+
+ public void processEvent(KeyValue event) {
+ if (zk == null) {
+ Logger.getLogger(getClass()).info("Creating ZK connection");
+ try {
+ zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if ("key".equals(event.getKey())) {
+ setValue(this.value + event.getValue());
+ try {
+ Logger.getLogger(getClass()).info("setting ZK /value");
+ zk.create("/value", value.getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ throw new RuntimeException("unknown event " + event);
+
+ }
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public void output() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void process(WatchedEvent arg0) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/pecache/TestRecoveryInteractionWithPECache.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/pecache/TestRecoveryInteractionWithPECache.java b/s4-core/src/test/java/org/apache/s4/ft/pecache/TestRecoveryInteractionWithPECache.java
new file mode 100644
index 0000000..bc868ea
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/pecache/TestRecoveryInteractionWithPECache.java
@@ -0,0 +1,101 @@
+package org.apache.s4.ft.pecache;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.s4.ft.EventGenerator;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.ft.S4App;
+import org.apache.s4.ft.S4TestCase;
+import org.apache.s4.ft.TestUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.json.JSONException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRecoveryInteractionWithPECache extends S4TestCase {
+
+ private static Factory zookeeperServerConnectionFactory;
+ private Process forkedS4App;
+
+ @Before
+ public void prepare() throws IOException, InterruptedException, KeeperException {
+ TestUtils.cleanupTmpDirs();
+ zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+ forkedS4App.destroy();
+ }
+
+ @Test
+ public void testNoRecoveryAfterExpiration() throws Exception {
+ byte[] data = testAndReturnValueAfterExpiration("app_conf_noRecoveryAfterExpiration.xml");
+ Assert.assertTrue("got: " + new String(data) , new String(data).equals("value2-"));
+ }
+
+ @Test
+ public void testRecoveryAfterExpiration() throws Exception {
+ byte[] data = testAndReturnValueAfterExpiration("app_conf_recoveryAfterExpiration.xml");
+ Assert.assertTrue(new String(data), new String(data).equals("value1-value2-"));
+ }
+
+
+ private byte[] testAndReturnValueAfterExpiration(String appConfig) throws Exception,
+ IOException, KeeperException, InterruptedException, JSONException {
+
+ forkedS4App = TestUtils.forkS4App(getClass().getName(), "s4_core_conf_fs_backend.xml", appConfig);
+// app.initializeS4App();
+ final ZooKeeper zk = TestUtils.createZkClient();
+
+ CountDownLatch latch1=new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/value", latch1,
+ zk);
+
+ EventGenerator generator = new EventGenerator();
+ generator.injectValueEvent(new KeyValue("key", "value1-"),
+ "Values", 0);
+
+ latch1.await(10, TimeUnit.SECONDS);
+
+ byte[] data = zk.getData("/value", false, null);
+ Assert.assertTrue(new String(data).equals("value1-"));
+
+ zk.delete("/value", -1);
+
+ boolean nodeDeleted = false;
+ try {
+ zk.getData("/value", false, null);
+ } catch (NoNodeException e) {
+ nodeDeleted = true;
+ }
+ Assert.assertTrue(nodeDeleted);
+ // NOTE: we need sleeps to wait for checkpoints (in the absence of ZK notifications for that)
+ Thread.sleep(1000);
+ forkedS4App.destroy();
+ forkedS4App = TestUtils.forkS4App(getClass().getName(), "s4_core_conf_fs_backend.xml", appConfig);
+ CountDownLatch latch2=new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/value", latch2,
+ zk);
+ generator.injectValueEvent(new KeyValue("key", "value2-"),
+ "Values", 0);
+ latch2.await(10, TimeUnit.SECONDS);
+ data = zk.getData("/value", false, null);
+ return data;
+ }
+
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_noRecoveryAfterExpiration.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_noRecoveryAfterExpiration.xml b/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_noRecoveryAfterExpiration.xml
new file mode 100644
index 0000000..52c3b0b
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_noRecoveryAfterExpiration.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+ <bean id="cacheTestPE" class="org.apache.s4.ft.pecache.CacheTestPE">
+ <property name="id" value="cacheTestPE"/>
+ <property name="keys">
+ <list>
+ <value>Values key</value>
+ </list>
+ </property>
+ <property name="checkpointingFrequencyByEventCount" value="1" />
+ <property name="ttl" value="1"/>
+ <property name="recoveryAfterExpiration" value="false"/>
+ </bean>
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_recoveryAfterExpiration.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_recoveryAfterExpiration.xml b/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_recoveryAfterExpiration.xml
new file mode 100644
index 0000000..dd455d4
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_recoveryAfterExpiration.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+ <bean id="cacheTestPE" class="org.apache.s4.ft.pecache.CacheTestPE">
+ <property name="id" value="cacheTestPE"/>
+ <property name="keys">
+ <list>
+ <value>Values key</value>
+ </list>
+ </property>
+ <property name="checkpointingFrequencyByEventCount" value="1" />
+ <property name="ttl" value="1"/>
+ <property name="recoveryAfterExpiration" value="true"/>
+ </bean>
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/pecache/s4_core_conf_fs_backend.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/pecache/s4_core_conf_fs_backend.xml b/s4-core/src/test/java/org/apache/s4/ft/pecache/s4_core_conf_fs_backend.xml
new file mode 100755
index 0000000..7f0e26d
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/pecache/s4_core_conf_fs_backend.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+ <bean id="propertyConfigurer"
+ class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+ <property name="location">
+ <value>classpath:s4_core.properties</value>
+ </property>
+ <property name="properties">
+ <props>
+ <prop key="kryoSerDeser.initialBufferSize">2048</prop>
+ <prop key="kryoSerDeser.maxBufferSize">262144</prop>
+ </props>
+ </property>
+ <property name="ignoreUnresolvablePlaceholders" value="true" />
+ </bean>
+
+ <bean id="hasher" class="org.apache.s4.dispatcher.partitioner.DefaultHasher" />
+
+ <bean id="commLayerEmitterToAdapter" class="org.apache.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="listenerAppName" value="${adapter_app_name}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="commLayerEmitter" class="org.apache.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="serDeser" class="org.apache.s4.serialize.KryoSerDeser">
+ <property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
+ <property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
+ </bean>
+
+ <!--START: Dispatchers for control event processor. If stream name in Response
+ is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter).
+ Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
+ <bean id="ctrlDispatcher" class="org.apache.s4.dispatcher.MultiDispatcher">
+ <property name="dispatchers">
+ <list>
+ <ref bean="ctrlDispatcherFilteredS4" />
+ <ref bean="ctrlDispatcherFilteredAdapter" />
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredAdapter" class="org.apache.s4.dispatcher.StreamSelectingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherAdapter" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredS4" class="org.apache.s4.dispatcher.StreamExcludingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherS4" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="genericPartitioner" class="org.apache.s4.dispatcher.partitioner.DefaultPartitioner">
+ <property name="hasher" ref="hasher" />
+ <property name="debug" value="false" />
+ </bean>
+
+ <bean id="ctrlDispatcherS4" class="org.apache.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="ctrlDispatcherAdapter" class="org.apache.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+ <!-- END: Dispatchers for control events -->
+
+ <!-- Control Events handler -->
+ <bean id="ctrlHandler" class="org.apache.s4.processor.ControlEventProcessor">
+ <property name="dispatcher" ref="ctrlDispatcher" />
+ </bean>
+
+ <bean id="peContainer" class="org.apache.s4.processor.PEContainer"
+ init-method="init" lazy-init="true">
+ <property name="maxQueueSize" value="${pe_container_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ <property name="trackByKey" value="true" />
+ <property name="clock" ref="clock" />
+ <property name="controlEventProcessor" ref="ctrlHandler" />
+ <property name="safeKeeper" ref="safeKeeper" />
+ </bean>
+
+ <bean id="rawListener" class="org.apache.s4.listener.CommLayerListener"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="clusterManagerAddress" value="${zk_address}" />
+ <property name="appName" value="${s4_app_name}" />
+ <property name="maxQueueSize" value="${listener_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="eventListener" class="org.apache.s4.collector.EventListener"
+ init-method="init">
+ <property name="rawListener" ref="rawListener" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="monitor" class="org.apache.s4.logger.Log4jMonitor" lazy-init="true"
+ init-method="init">
+ <property name="flushInterval" value="30" />
+ <property name="loggerName" value="monitor" />
+ </bean>
+
+ <bean id="watcher" class="org.apache.s4.util.Watcher" init-method="init"
+ lazy-init="true">
+ <property name="monitor" ref="monitor" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="minimumMemory" value="52428800" />
+ </bean>
+
+
+
+
+ <!-- Some useful beans related to client-adapter for apps -->
+
+ <!-- Dispatcher to send to all adapter nodes. -->
+ <bean id="dispatcherToClientAdapters" class="org.apache.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="broadcastPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <!-- Partitioner to achieve broadcast -->
+ <bean id="broadcastPartitioner" class="org.apache.s4.dispatcher.partitioner.BroadcastPartitioner" />
+
+
+
+ <bean id="loopbackDispatcher" class="org.apache.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="loopbackPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="loopbackPartitioner" class="org.apache.s4.dispatcher.partitioner.LoopbackPartitioner">
+ <property name="eventEmitter" ref="commLayerEmitter"/>
+ </bean>
+
+ <bean id="safeKeeper" class="org.apache.s4.ft.SafeKeeper" init-method="init">
+ <property name="stateStorage" ref="fsStateStorage" />
+ <property name="loopbackDispatcher" ref="loopbackDispatcher" />
+ <property name="serializer" ref="serDeser"/>
+ <property name="hasher" ref="hasher"/>
+ <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
+ </bean>
+
+ <bean id="loggingStorageCallbackFactory" class="org.apache.s4.ft.LoggingStorageCallbackFactory"/>
+
+ <bean id="fsStateStorage" class="org.apache.s4.ft.DefaultFileSystemStateStorage" init-method="init">
+ <!-- if not specified, default is <current_dir>/tmp/storage
+ <property name="storageRootPath" value="${storage_root_path}" /> -->
+ </bean>
+
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/pecache/wall_clock.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/pecache/wall_clock.xml b/s4-core/src/test/java/org/apache/s4/ft/pecache/wall_clock.xml
new file mode 100644
index 0000000..cc571a6
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/pecache/wall_clock.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+ <bean id="clock" class="org.apache.s4.util.clock.WallClock"/>
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java b/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
index 37a68d1..3259277 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
@@ -83,7 +83,7 @@ public class FTWordCountTest extends S4TestCase {
throws Exception {
final ZooKeeper zk = TestUtils.createZkClient();
- forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf);
+ forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf, "app_conf.xml");
CountDownLatch signalTextProcessed = new CountDownLatch(1);
TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
@@ -110,7 +110,7 @@ public class FTWordCountTest extends S4TestCase {
forkedS4App.destroy();
// recovering and making sure checkpointing still works
- forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf);
+ forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf, "app_conf.xml");
// add authorizations for continuing processing. Without these, the
// WordClassifier processed keeps waiting
@@ -136,7 +136,7 @@ public class FTWordCountTest extends S4TestCase {
// crash the app
forkedS4App.destroy();
- forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf);
+ forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf, "app_conf.xml");
// add authorizations for continuing processing. Without these, the
// WordClassifier processed keeps waiting
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java b/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java
index 445972a..155b032 100644
--- a/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java
+++ b/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java
@@ -2,8 +2,8 @@ package org.apache.s4.processor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import org.apache.s4.util.clock.WallClock;
+import org.apache.s4.util.clock.WallClock;
import org.junit.Test;
public class TestPrototypeWrapper
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 5a511d7..99ef2b8 100644
--- a/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -46,7 +46,7 @@ public class WordCountTest extends S4TestCase {
public void testSimple() throws Exception {
S4App app = new S4App(getClass(), "s4_core_conf.xml");
- app.initializeS4App();
+ app.initializeS4App("app_conf.xml");
final ZooKeeper zk = TestUtils.createZkClient();
CountDownLatch signalTextProcessed = new CountDownLatch(1);