You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/31 11:59:33 UTC
[4/4] git commit: S4-40 asynchronously serialize PEs
S4-40 asynchronously serialize PEs
- offloading this task from the event processing thread
- race conditions prevented through a coordination mechanism
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/29a22ef0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/29a22ef0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/29a22ef0
Branch: refs/heads/dev
Commit: 29a22ef0380fec4c355583ef082cdc45711b914e
Parents: 45efb82
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu Jan 12 16:53:08 2012 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Jan 12 16:53:08 2012 +0100
----------------------------------------------------------------------
.../org/apache/s4/ft/CheckpointingCoordinator.java | 135 ++++++++++
.../src/main/java/org/apache/s4/ft/SafeKeeper.java | 208 +++++++++++----
.../main/java/org/apache/s4/ft/SaveStateTask.java | 27 ++-
.../main/java/org/apache/s4/ft/SerializeTask.java | 45 +++
.../java/org/apache/s4/processor/AbstractPE.java | 16 +-
.../main/java/org/apache/s4/util/MetricsName.java | 109 ++++----
.../test/java/org/apache/s4/ft/RecoveryTest.java | 6 +-
7 files changed, 425 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/29a22ef0/s4-core/src/main/java/org/apache/s4/ft/CheckpointingCoordinator.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/CheckpointingCoordinator.java b/s4-core/src/main/java/org/apache/s4/ft/CheckpointingCoordinator.java
new file mode 100644
index 0000000..7a20ea7
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/CheckpointingCoordinator.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.ft;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+import org.apache.s4.processor.AbstractPE;
+
+/**
+ * Prevents event processing thread and serialization thread to overlap on the same PE instance, which would cause consistency issues during recovery.
+ *
+ * How it works:
+ * - we keep track of the PE being serialized and the PE being processed
+ * - access to the PE is guarded by the instance of this class
+ * - if the event processing thread receives an event that is handled by a PE currently being serialized, it waits until serialization is complete
+ * - this is expected to happen rarely, because there are many PEs and they don't get checkpointed all the time
+ * - there is a configurable timeout for blocking the event processing thread (if triggered, an error message is displayed)
+ *
+ */
+public class CheckpointingCoordinator {
+
+ private static final Logger logger = Logger
+ .getLogger(CheckpointingCoordinator.class);
+
+ AbstractPE processing = null;
+ AbstractPE serializing = null;
+
+ long maxSerializationLockDuration;
+
+ Lock lock = new ReentrantLock();
+ Condition processingFinished = lock.newCondition();
+ Condition serializingFinished = lock.newCondition();
+
+ public CheckpointingCoordinator(long maxSerializationLockDuration) {
+ super();
+ this.maxSerializationLockDuration = maxSerializationLockDuration;
+ }
+
+ public void acquireForProcessing(AbstractPE pe) {
+ lock.lock();
+ try {
+ if (serializing == pe) {
+ try {
+ if (logger.isTraceEnabled()) {
+ logger.trace("processing must wait for serialization to finish for PE "
+ + pe.getId() + "/" + pe.getKeyValueString());
+ }
+ serializingFinished.await(maxSerializationLockDuration,
+ TimeUnit.MILLISECONDS);
+ acquireForProcessing(pe);
+ } catch (InterruptedException e) {
+ logger.error("Could not acquire permit for processing after timeout of ["
+ + maxSerializationLockDuration
+ + "] milliseconds for PE["
+ + pe.getId()
+ + "/"
+ + pe.getKeyValueString()
+ + "]\nProceeding anyway, but checkpoint may contain inconsistent value");
+ serializing = null;
+ }
+ }
+ processing = pe;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void releaseFromProcessing(AbstractPE pe) {
+ lock.lock();
+ try {
+ if (processing == pe) {
+ processing = null;
+ processingFinished.signal();
+ } else {
+ logger.warn("Cannot release from processing thread a PE that is not already in processing state");
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void acquireForSerialization(AbstractPE pe) {
+ lock.lock();
+ try {
+ if (processing == pe) {
+ try {
+ if (logger.isTraceEnabled()) {
+ logger.trace("serialization must wait for processing to finish for PE "
+ + pe.getId() + "/" + pe.getKeyValueString());
+ }
+ processingFinished.await(maxSerializationLockDuration, TimeUnit.MILLISECONDS);
+ acquireForSerialization(pe);
+ } catch (InterruptedException e) {
+ // we still need to make sure it is now safe to serialize
+ acquireForSerialization(pe);
+ }
+ }
+ serializing = pe;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void releaseFromSerialization(AbstractPE pe)
+ throws InterruptedException {
+ lock.lock();
+ try {
+ if (serializing == pe) {
+ serializing = null;
+ serializingFinished.signal();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/29a22ef0/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
index f47fe44..aa72010 100644
--- a/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
+++ b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
@@ -17,21 +17,25 @@
*/
package org.apache.s4.ft;
-import org.apache.s4.dispatcher.Dispatcher;
-import org.apache.s4.dispatcher.partitioner.Hasher;
-import org.apache.s4.emitter.CommLayerEmitter;
-import org.apache.s4.processor.AbstractPE;
-import org.apache.s4.serialize.SerializerDeserializer;
+import static org.apache.s4.util.MetricsName.S4_CORE_METRICS;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
+import org.apache.s4.dispatcher.Dispatcher;
+import org.apache.s4.dispatcher.partitioner.Hasher;
+import org.apache.s4.emitter.CommLayerEmitter;
+import org.apache.s4.logger.Monitor;
+import org.apache.s4.processor.AbstractPE;
+import org.apache.s4.serialize.SerializerDeserializer;
+import org.apache.s4.util.MetricsName;
/**
*
@@ -46,7 +50,7 @@ import org.apache.log4j.Logger;
*/
public class SafeKeeper {
- public enum StorageResultCode {
+ public enum StorageResultCode {
SUCCESS, FAILURE
}
@@ -60,15 +64,29 @@ public class SafeKeeper {
private CountDownLatch signalNodesAvailable = new CountDownLatch(1);
private StorageCallbackFactory storageCallbackFactory = new LoggingStorageCallbackFactory();
- ThreadPoolExecutor threadPool;
-
- int maxWriteThreads = 1;
- int writeThreadKeepAliveSeconds = 120;
- int maxOutstandingWriteRequests = 1000;
+ private ThreadPoolExecutor storageThreadPool;
+ private ThreadPoolExecutor serializationThreadPool;
+
+ private CheckpointingCoordinator processingSerializationSynchro;
+
+ private Monitor monitor;
+
+ int storageMaxThreads = 1;
+ int storageThreadKeepAliveSeconds = 120;
+ int storageMaxOutstandingRequests = 1000;
+
+ int serializationMaxThreads=1;
+ int serializationThreadKeepAliveSeconds = 120;
+ int serializationMaxOutstandingRequests = 1000;
+
+ long maxSerializationLockTime = 1000;
public SafeKeeper() {
}
+ public void setMonitor(Monitor monitor) {
+ this.monitor = monitor;
+ }
/**
* <p>
* This init() method <b>must</b> be called by the dependency injection
@@ -83,11 +101,16 @@ public class SafeKeeper {
} catch (InterruptedException e1) {
e1.printStackTrace();
}
- threadPool = new ThreadPoolExecutor(1, maxWriteThreads, writeThreadKeepAliveSeconds, TimeUnit.SECONDS,
- new ArrayBlockingQueue<Runnable>(maxOutstandingWriteRequests));
- logger.debug("Started thread pool with maxWriteThreads=[" + maxWriteThreads
- + "], writeThreadKeepAliveSeconds=[" + writeThreadKeepAliveSeconds + "], maxOutsandingWriteRequests=["
- + maxOutstandingWriteRequests + "]");
+ storageThreadPool = new ThreadPoolExecutor(1, storageMaxThreads, storageThreadKeepAliveSeconds, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<Runnable>(storageMaxOutstandingRequests));
+ serializationThreadPool = new ThreadPoolExecutor(1, serializationMaxThreads, serializationThreadKeepAliveSeconds, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<Runnable>(serializationMaxOutstandingRequests));
+
+ processingSerializationSynchro = new CheckpointingCoordinator(maxSerializationLockTime);
+
+ logger.debug("Started thread pool with maxWriteThreads=[" + storageMaxThreads
+ + "], writeThreadKeepAliveSeconds=[" + storageThreadKeepAliveSeconds + "], maxOutsandingWriteRequests=["
+ + storageMaxOutstandingRequests + "]");
int nodeCount = getLoopbackDispatcher().getEventEmitter().getNodeCount();
// required wait until nodes are available
@@ -98,36 +121,77 @@ public class SafeKeeper {
}
nodeCount = getLoopbackDispatcher().getEventEmitter().getNodeCount();
}
-
+
signalNodesAvailable.countDown();
}
-
+
/**
- * Forwards a call to checkpoint a PE to the backend storage.
- *
- * @param key
- * safeKeeperId
- * @param state
- * checkpoint data
+ * Synchronization to prevent race conditions with serialization threads
*/
- public void saveState(SafeKeeperId safeKeeperId, byte[] serializedState) {
- StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
- try {
- threadPool.submit(createSaveStateTask(safeKeeperId, serializedState));
- } catch (RejectedExecutionException e) {
+ public void acquirePermitForProcessing(AbstractPE pe) {
+ processingSerializationSynchro.acquireForProcessing(pe);
+ }
+
+ /**
+ * Notification part of the mechanism for preventing race condition with serialization threads
+ */
+ public void releasePermitForProcessing(AbstractPE pe) {
+ processingSerializationSynchro.releaseFromProcessing(pe);
+ }
+
+
+ /**
+ * Serializes and stores state to the storage backend. Serialization and storage operations are asynchronous.
+ *
+ * @return a callback for getting notified of the result of the storage operation
+ */
+ public StorageCallback saveState(AbstractPE pe) {
+ StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
+ Future<byte[]> futureSerializedState = null;
+ try {
+ futureSerializedState = serializeState(pe, processingSerializationSynchro);
+ } catch (RejectedExecutionException e) {
+ if (monitor!=null) {
+ monitor.increment(MetricsName.checkpointing_dropped_from_serialization_queue.toString(), 1, S4_CORE_METRICS.toString());
+ }
+ storageCallback.storageOperationResult(StorageResultCode.FAILURE,
+ "Serialization task queue is full. An older serialization task was dumped in order to serialize PE ["+ pe.getId()+"]" +
+ " Remaining capacity for the serialization task queue is ["
+ + serializationThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+ + serializationThreadPool.getQueue().size() + "] ; maximum capacity is [" + serializationThreadPool
+ + "]");
+ return storageCallback;
+ }
+ submitSaveStateTask(new SaveStateTask(pe.getSafeKeeperId(), futureSerializedState, storageCallback, stateStorage), storageCallback);
+ return storageCallback;
+ }
+
+ private Future<byte[]> serializeState(AbstractPE pe, CheckpointingCoordinator coordinator) {
+ Future<byte[]> future = serializationThreadPool.submit(new SerializeTask(pe, coordinator));
+ if(monitor!=null) {
+ monitor.increment(MetricsName.checkpointing_added_to_serialization_queue.toString(), 1, S4_CORE_METRICS.toString());
+ }
+ return future;
+ }
+
+ private void submitSaveStateTask(SaveStateTask task, StorageCallback storageCallback) {
+ try {
+ storageThreadPool.execute(task);
+ if (monitor!=null) {
+ monitor.increment(MetricsName.checkpointing_added_to_storage_queue.toString(), 1);
+ }
+ } catch (RejectedExecutionException e) {
+ if (monitor!=null) {
+ monitor.increment(MetricsName.checkpointing_dropped_from_storage_queue.toString(), 1);
+ }
storageCallback.storageOperationResult(StorageResultCode.FAILURE,
- "Could not submit task to persist checkpoint. Remaining capacity for task queue is ["
- + threadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
- + threadPool.getQueue().size() + "] ; maximum capacity is [" + maxOutstandingWriteRequests
+ "Storage checkpoint queue is full. Removed an old task to handle latest task. Remaining capacity for task queue is ["
+ + storageThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+ + storageThreadPool.getQueue().size() + "] ; maximum capacity is [" + storageMaxOutstandingRequests
+ "]");
}
}
- private SaveStateTask createSaveStateTask(SafeKeeperId safeKeeperId, byte[] serializedState) {
- return new SaveStateTask(safeKeeperId, serializedState, storageCallbackFactory.createStorageCallback(),
- stateStorage);
- }
-
/**
* Fetches checkpoint data from storage for a given PE
*
@@ -232,28 +296,62 @@ public class SafeKeeper {
this.storageCallbackFactory = storageCallbackFactory;
}
- public int getMaxWriteThreads() {
- return maxWriteThreads;
- }
+ public int getStorageMaxThreads() {
+ return storageMaxThreads;
+ }
- public void setMaxWriteThreads(int maxWriteThreads) {
- this.maxWriteThreads = maxWriteThreads;
- }
+ public void setStorageMaxThreads(int storageMaxThreads) {
+ this.storageMaxThreads = storageMaxThreads;
+ }
- public int getWriteThreadKeepAliveSeconds() {
- return writeThreadKeepAliveSeconds;
- }
+ public int getStorageThreadKeepAliveSeconds() {
+ return storageThreadKeepAliveSeconds;
+ }
- public void setWriteThreadKeepAliveSeconds(int writeThreadKeepAliveSeconds) {
- this.writeThreadKeepAliveSeconds = writeThreadKeepAliveSeconds;
- }
+ public void setStorageThreadKeepAliveSeconds(int storageThreadKeepAliveSeconds) {
+ this.storageThreadKeepAliveSeconds = storageThreadKeepAliveSeconds;
+ }
- public int getMaxOutstandingWriteRequests() {
- return maxOutstandingWriteRequests;
- }
+ public int getStorageMaxOutstandingRequests() {
+ return storageMaxOutstandingRequests;
+ }
- public void setMaxOutstandingWriteRequests(int maxOutstandingWriteRequests) {
- this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
- }
+ public void setStorageMaxOutstandingRequests(int storageMaxOutstandingRequests) {
+ this.storageMaxOutstandingRequests = storageMaxOutstandingRequests;
+ }
+
+ public int getSerializationMaxThreads() {
+ return serializationMaxThreads;
+ }
+
+ public void setSerializationMaxThreads(int serializationMaxThreads) {
+ this.serializationMaxThreads = serializationMaxThreads;
+ }
+
+ public int getSerializationThreadKeepAliveSeconds() {
+ return serializationThreadKeepAliveSeconds;
+ }
+
+ public void setSerializationThreadKeepAliveSeconds(
+ int serializationThreadKeepAliveSeconds) {
+ this.serializationThreadKeepAliveSeconds = serializationThreadKeepAliveSeconds;
+ }
+
+ public int getSerializationMaxOutstandingRequests() {
+ return serializationMaxOutstandingRequests;
+ }
+
+ public void setSerializationMaxOutstandingRequests(
+ int serializationMaxOutstandingRequests) {
+ this.serializationMaxOutstandingRequests = serializationMaxOutstandingRequests;
+ }
+
+ public long getMaxSerializationLockTime() {
+ return maxSerializationLockTime;
+ }
+
+ public void setMaxSerializationLockTime(long maxSerializationLockTime) {
+ this.maxSerializationLockTime = maxSerializationLockTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/29a22ef0/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java b/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
index 4ed79c9..81363a1 100644
--- a/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
+++ b/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
@@ -17,6 +17,11 @@
*/
package org.apache.s4.ft;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.log4j.Logger;
+
/**
*
@@ -26,20 +31,36 @@ package org.apache.s4.ft;
public class SaveStateTask implements Runnable {
SafeKeeperId safeKeeperId;
- byte[] state;
+ byte[] serializedState;
+ Future<byte[]> futureSerializedState = null;
StorageCallback storageCallback;
StateStorage stateStorage;
public SaveStateTask(SafeKeeperId safeKeeperId, byte[] state, StorageCallback storageCallback, StateStorage stateStorage) {
super();
this.safeKeeperId = safeKeeperId;
- this.state = state;
+ this.serializedState = state;
+ this.storageCallback = storageCallback;
+ this.stateStorage = stateStorage;
+ }
+
+ public SaveStateTask(SafeKeeperId safeKeeperId, Future<byte[]> futureSerializedState, StorageCallback storageCallback, StateStorage stateStorage) {
+ this.safeKeeperId = safeKeeperId;
+ this.futureSerializedState = futureSerializedState;
this.storageCallback = storageCallback;
this.stateStorage = stateStorage;
}
@Override
public void run() {
- stateStorage.saveState(safeKeeperId, state, storageCallback);
+ if (futureSerializedState!=null) {
+ try {
+ stateStorage.saveState(safeKeeperId, futureSerializedState.get(), storageCallback);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ Logger.getLogger(SaveStateTask.class).warn("Cannot save checkpoint : " + safeKeeperId, e);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/29a22ef0/s4-core/src/main/java/org/apache/s4/ft/SerializeTask.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SerializeTask.java b/s4-core/src/main/java/org/apache/s4/ft/SerializeTask.java
new file mode 100644
index 0000000..2fe2d13
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/SerializeTask.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.ft;
+
+import java.util.concurrent.Callable;
+
+import org.apache.s4.processor.AbstractPE;
+
+public class SerializeTask implements Callable<byte[]> {
+
+ AbstractPE pe;
+ private CheckpointingCoordinator coordinator;
+
+ public SerializeTask(AbstractPE pe, CheckpointingCoordinator coordinator) {
+ super();
+ this.pe = pe;
+ this.coordinator = coordinator;
+ }
+
+ @Override
+ public byte[] call() throws Exception {
+ try {
+ coordinator.acquireForSerialization(pe);
+ return pe.serializeState();
+ } finally {
+ coordinator.releaseFromSerialization(pe);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/29a22ef0/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java b/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
index 58757d3..e4b7109 100644
--- a/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
+++ b/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
@@ -22,6 +22,7 @@ import org.apache.s4.dispatcher.partitioner.KeyInfo;
import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElement;
import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementIndex;
import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementName;
+import org.apache.s4.ft.CheckpointingCoordinator;
import org.apache.s4.ft.InitiateCheckpointingEvent;
import org.apache.s4.ft.RecoveryEvent;
import org.apache.s4.ft.SafeKeeper;
@@ -42,6 +43,7 @@ import java.util.List;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -120,6 +122,7 @@ public abstract class AbstractPE implements Cloneable {
transient private int checkpointableEventCount = 0;
transient private int checkpointsBeforePause = -1;
transient private long checkpointingPauseTimeInMillis;
+
transient private OverloadDispatcher overloadDispatcher;
@@ -206,6 +209,7 @@ public abstract class AbstractPE implements Cloneable {
this.streamName = streamName;
if (safeKeeper != null) {
+ safeKeeper.acquirePermitForProcessing(this);
// initialize checkpointing event flag
this.isCheckpointingEvent = false;
if (!recoveryAttempted) {
@@ -241,7 +245,10 @@ public abstract class AbstractPE implements Cloneable {
checkpoint();
}
}
-
+ }
+
+ if (safeKeeper!=null) {
+ safeKeeper.releasePermitForProcessing(this);
}
}
@@ -539,17 +546,12 @@ public abstract class AbstractPE implements Cloneable {
protected void checkpoint() {
- byte[] serializedState = serializeState();
// NOTE: assumes pe id is keyvalue from the PE...
- saveState(getSafeKeeperId(), serializedState);
+ safeKeeper.saveState(this);
// remove dirty flag
checkpointable = false;
}
- private void saveState(SafeKeeperId key, byte[] serializedState) {
- safeKeeper.saveState(key, serializedState);
- }
-
protected void recover() {
byte[] serializedState = null;
try {
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/29a22ef0/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/MetricsName.java b/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
index b010a29..2fae80d 100644
--- a/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
+++ b/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
@@ -1,53 +1,56 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.s4.util;
-
-public enum MetricsName {
- // metrics event name
- S4_APP_METRICS("S4::S4AppMetrics"), S4_EVENT_METRICS("S4::S4EventMetrics"), S4_CORE_METRICS(
- "S4::S4CoreMetrics"),
-
- // metrics name
- low_level_listener_msg_in_ct("lll_in"), low_level_listener_msg_drop_ct(
- "lll_dr"), low_level_listener_qsz("lll_qsz"), low_level_listener_badmsg_ct(
- "lll_bad"), // exception can't be caught
- generic_listener_msg_in_ct("gl_in"), pecontainer_ev_dq_ct("pec_dq"), pecontainer_ev_nq_ct(
- "pec_nq"), pecontainer_msg_drop_ct("pec_dr"), pecontainer_qsz(
- "pec_qsz"), pecontainer_qsz_w("pec_qsz_w"), pecontainer_ev_process_ct(
- "pec_pr"), pecontainer_pe_ct("pec_pe"), pecontainer_ev_err_ct(
- "pec_err"), // exception can't be caught
- pecontainer_exec_elapse_time("pec_exec_t"), low_level_emitter_msg_out_ct(
- "lle_out"), low_level_emitter_out_err_ct("lle_err"), low_level_emitter_qsz(
- "lle_qsz"), s4_core_exit_ct("s4_ex_ct"), s4_core_free_mem("s4_fmem"), pe_join_ev_ct(
- "pe_j_ct"), pe_error_count("pe_err");
-
- private final String eventShortName;
-
- private MetricsName(String eventShortName) {
- this.eventShortName = eventShortName;
- }
-
- public String toString() {
- return eventShortName;
- }
-
- public static void main(String[] args) {
- System.out.println(generic_listener_msg_in_ct.toString());
-
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.util;
+
+public enum MetricsName {
+ // metrics event name
+ S4_APP_METRICS("S4::S4AppMetrics"), S4_EVENT_METRICS("S4::S4EventMetrics"), S4_CORE_METRICS(
+ "S4::S4CoreMetrics"),
+
+ // metrics name
+ low_level_listener_msg_in_ct("lll_in"), low_level_listener_msg_drop_ct(
+ "lll_dr"), low_level_listener_qsz("lll_qsz"), low_level_listener_badmsg_ct(
+ "lll_bad"), // exception can't be caught
+ generic_listener_msg_in_ct("gl_in"), pecontainer_ev_dq_ct("pec_dq"), pecontainer_ev_nq_ct(
+ "pec_nq"), pecontainer_msg_drop_ct("pec_dr"), pecontainer_qsz(
+ "pec_qsz"), pecontainer_qsz_w("pec_qsz_w"), pecontainer_ev_process_ct(
+ "pec_pr"), pecontainer_pe_ct("pec_pe"), pecontainer_ev_err_ct(
+ "pec_err"), // exception can't be caught
+ pecontainer_exec_elapse_time("pec_exec_t"), low_level_emitter_msg_out_ct(
+ "lle_out"), low_level_emitter_out_err_ct("lle_err"), low_level_emitter_qsz(
+ "lle_qsz"), s4_core_exit_ct("s4_ex_ct"), s4_core_free_mem("s4_fmem"), pe_join_ev_ct(
+ "pe_j_ct"), pe_error_count("pe_err"), checkpointing_dropped_from_serialization_queue("cp_ser_dr"),
+ checkpointing_dropped_from_storage_queue("cp_sto_dr"),
+ checkpointing_added_to_serialization_queue("cp_ser_in"),
+ checkpointing_added_to_storage_queue("cp_sto_in");
+
+ private final String eventShortName;
+
+ private MetricsName(String eventShortName) {
+ this.eventShortName = eventShortName;
+ }
+
+ public String toString() {
+ return eventShortName;
+ }
+
+ public static void main(String[] args) {
+ System.out.println(generic_listener_msg_in_ct.toString());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/29a22ef0/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java b/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
index 1ca5652..0265ba0 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
@@ -53,7 +53,7 @@ public class RecoveryTest extends S4TestCase {
forkedS4App = TestUtils.forkS4App(getClass().getName(),
"s4_core_conf_fs_backend.xml");
// TODO synchro
- Thread.sleep(4000);
+ Thread.sleep(5000);
CountDownLatch signalValue1Set = new CountDownLatch(1);
TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
@@ -75,13 +75,13 @@ public class RecoveryTest extends S4TestCase {
// trigger checkpoint
gen.injectValueEvent(new KeyValue("initiateCheckpoint", "blah"),
"Stream1", 0);
- signalCheckpointed.await();
+ signalCheckpointed.await(10, TimeUnit.SECONDS);
// signalCheckpointAddedByBK.await();
signalValue1Set = new CountDownLatch(1);
TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
gen.injectValueEvent(new KeyValue("value1", "message1b"), "Stream1", 0);
- signalValue1Set.await();
+ signalValue1Set.await(10, TimeUnit.SECONDS);
Assert.assertEquals("value1=message1b ; value2=",
TestUtils.readFile(StatefulTestPE.DATA_FILE));