You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:16 UTC
[36/50] [abbrv] Rename packages in preparation for move to Apache
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/Dispatcher.java b/s4-core/src/main/java/org/apache/s4/dispatcher/Dispatcher.java
new file mode 100644
index 0000000..7aeac4c
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/Dispatcher.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.dispatcher.partitioner.Partitioner;
+import org.apache.s4.dispatcher.partitioner.VariableKeyPartitioner;
+import org.apache.s4.dispatcher.transformer.Transformer;
+import org.apache.s4.emitter.EventEmitter;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+public class Dispatcher implements EventDispatcher {
+ private EventEmitter eventEmitter;
+ private Transformer[] transformers = new Transformer[0];
+ private Partitioner[] partitioners = new Partitioner[0];
+ private String configFilename;
+ private boolean debug = false;
+ private String loggerName = "s4";
+
+ public final static String PARTITION_INFO_KEY = "S4__PartitionInfo";
+
+ public void setTransformers(Transformer[] transformers) {
+ this.transformers = transformers;
+ }
+
+ public void setPartitioners(Partitioner[] partitioners) {
+ this.partitioners = partitioners;
+ }
+
+ public void setEventEmitter(EventEmitter eventEmitter) {
+ this.eventEmitter = eventEmitter;
+ }
+
+ public EventEmitter getEventEmitter() {
+ return this.eventEmitter;
+ }
+
+ public void setConfigFilename(String configFilename) {
+ this.configFilename = configFilename;
+ }
+
+ public void setDebug(boolean debug) {
+ this.debug = debug;
+ }
+
+ public void setLoggerName(String loggerName) {
+ this.loggerName = loggerName;
+ }
+
+ private volatile int eventCount = 0;
+ private volatile int rawEventCount = 0;
+
+ public Dispatcher() {
+
+ }
+
+ int counts[];
+
+ public void init() {
+
+ Runnable r = new Runnable() {
+ private long configFileTime = -1;
+
+ public void run() {
+ long lastCheckTime = System.currentTimeMillis();
+ int lastEventCount = eventCount;
+ int lastRawEventCount = rawEventCount;
+ while (!Thread.currentThread().isInterrupted()) {
+ int eventCount = Dispatcher.this.eventCount;
+ long currentTime = System.currentTimeMillis();
+ double rate = (eventCount - lastEventCount)
+ / ((currentTime - lastCheckTime) / 1000.0);
+ double rawRate = (rawEventCount - lastRawEventCount)
+ / ((currentTime - lastCheckTime) / 1000.0);
+ lastCheckTime = currentTime;
+ lastEventCount = eventCount;
+ lastRawEventCount = rawEventCount;
+ Logger.getLogger(loggerName).info("Event count is "
+ + eventCount + "; rate " + rate);
+ Logger.getLogger(loggerName).info("Raw event count is "
+ + rawEventCount + "; rate " + rawRate);
+ if (counts != null) {
+ for (int i = 0; i < counts.length; i++) {
+ Logger.getLogger(loggerName).info(i + ": "
+ + counts[i]);
+ }
+ }
+
+ configCheck();
+
+ try {
+ Thread.sleep(15000);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ }
+
+ private void configCheck() {
+ if (configFilename == null) {
+ return;
+ }
+
+ File file = new File(configFilename);
+ if (!file.exists()) {
+ return;
+ }
+ long lastModified = file.lastModified();
+ if (configFileTime == -1) {
+ configFileTime = lastModified;
+ return;
+ }
+
+ if (lastModified > configFileTime) {
+ Logger.getLogger(loggerName)
+ .info("Config file has changed. Exiting!!");
+ System.exit(4);
+ }
+ }
+ };
+ Thread t = new Thread(r);
+ t.start();
+ }
+
+ @Override
+ public void dispatchEvent(String streamName,
+ List<List<String>> compoundKeyNames, Object event) {
+ dispatchEvent(streamName, event, true, compoundKeyNames);
+ }
+
+ @Override
+ public void dispatchEvent(String streamName, Object event) {
+ dispatchEvent(streamName, event, false, null);
+ }
+
+ private void dispatchEvent(String streamName, Object event,
+ boolean variableKey,
+ List<List<String>> compoundKeyNames) {
+ synchronized (this) {
+ rawEventCount++;
+ }
+ if (eventEmitter.getNodeCount() <= 0) {
+ return;
+ } else {
+ if (counts == null) {
+ counts = new int[eventEmitter.getNodeCount()];
+ }
+ }
+
+ try {
+ synchronized (this) {
+ eventCount++;
+ }
+
+ List<CompoundKeyInfo> partionInfoList = new ArrayList<CompoundKeyInfo>();
+ for (Partitioner partitioner : partitioners) {
+ List<CompoundKeyInfo> pInfoList = null;
+
+ if (!variableKey) {
+ pInfoList = partitioner.partition(streamName,
+ event,
+ eventEmitter.getNodeCount());
+ } else {
+ if (partitioner instanceof VariableKeyPartitioner) {
+ VariableKeyPartitioner vp = (VariableKeyPartitioner) partitioner;
+ pInfoList = vp.partition(streamName,
+ compoundKeyNames,
+ event,
+ eventEmitter.getNodeCount());
+ }
+ }
+
+ if (pInfoList != null) {
+ partionInfoList.addAll(pInfoList);
+ }
+ }
+
+ Map<Integer, List<CompoundKeyInfo>> pInfoMap = new HashMap<Integer, List<CompoundKeyInfo>>();
+ for (CompoundKeyInfo partitionInfo : partionInfoList) {
+ int partitionId = partitionInfo.getPartitionId();
+ List<CompoundKeyInfo> listByPartitionNumber = pInfoMap.get(partitionId);
+ if (listByPartitionNumber == null) {
+ listByPartitionNumber = new ArrayList<CompoundKeyInfo>();
+ pInfoMap.put(partitionId, listByPartitionNumber);
+ }
+ listByPartitionNumber.add(partitionInfo);
+ }
+
+ for (int partitionId : pInfoMap.keySet()) {
+ EventWrapper eventWrapper = new EventWrapper(streamName,
+ event,
+ pInfoMap.get(partitionId));
+ counts[partitionId]++;
+ eventEmitter.emit(partitionId, eventWrapper);
+ }
+ } catch (Exception e) {
+ Logger.getLogger(loggerName)
+ .error("Exception in processEvent on thread "
+ + Thread.currentThread().getId() + " at time "
+ + System.currentTimeMillis(),
+ e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/EventDispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/EventDispatcher.java b/s4-core/src/main/java/org/apache/s4/dispatcher/EventDispatcher.java
new file mode 100644
index 0000000..12099ba
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/EventDispatcher.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher;
+
+import java.util.List;
+
+public interface EventDispatcher {
+
+ /**
+ * Dispatch event using stream name. Partitioners may be used to partition
+ * the event, possibly based on a pre-determined set of fixed named keys.
+ *
+ * @param streamName
+ * name of stream to dispatch on
+ * @param event
+ * object to dispatch
+ */
+ void dispatchEvent(String streamName, Object event);
+
+ /**
+ * Dispatch event using a stream name and using a set of named keys.
+ * VariableKeyPartitioners may be used to partition the event.
+ *
+ * @param streamName
+ * name of stream to dispatch on
+ * @param compoundKeyNames
+ * keys to use for dispatching
+ * @param event
+ * object to dispatch
+ */
+ void dispatchEvent(String streamName, List<List<String>> compoundKeyNames,
+ Object event);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/MultiDispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/MultiDispatcher.java b/s4-core/src/main/java/org/apache/s4/dispatcher/MultiDispatcher.java
new file mode 100644
index 0000000..543810e
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/MultiDispatcher.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher;
+
+import java.util.List;
+
+/**
+ * Dispatcher that sends events through multiple abstract dispatchers.
+ */
+public class MultiDispatcher implements EventDispatcher {
+
+ private EventDispatcher[] dispatchers = null;
+
+ public void setDispatchers(EventDispatcher[] dispatchers) {
+ this.dispatchers = dispatchers;
+ }
+
+ @Override
+ public void dispatchEvent(String streamName, Object event) {
+ if (dispatchers != null) {
+ for (EventDispatcher dispatcher : dispatchers) {
+ dispatcher.dispatchEvent(streamName, event);
+ }
+ }
+ }
+
+ @Override
+ public void dispatchEvent(String streamName,
+ List<List<String>> compoundKeyNames, Object event) {
+ if (dispatchers != null) {
+ for (EventDispatcher dispatcher : dispatchers) {
+ dispatcher.dispatchEvent(streamName, compoundKeyNames, event);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/StreamExcludingDispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/StreamExcludingDispatcher.java b/s4-core/src/main/java/org/apache/s4/dispatcher/StreamExcludingDispatcher.java
new file mode 100644
index 0000000..bf5e97b
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/StreamExcludingDispatcher.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+public class StreamExcludingDispatcher implements EventDispatcher {
+
+ private EventDispatcher dispatcher = null;
+
+ public void setDispatcher(EventDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ private HashSet<String> streams = null;
+
+ public void setStreams(String[] streams) {
+ this.streams = new HashSet<String>(Arrays.asList(streams));
+ }
+
+ @Override
+ public void dispatchEvent(String streamName, Object event) {
+ if (dispatcher != null
+ && (streams == null || !streams.contains(streamName))) {
+ dispatcher.dispatchEvent(streamName, event);
+ }
+ }
+
+ @Override
+ public void dispatchEvent(String streamName,
+ List<List<String>> compoundKeyNames, Object event) {
+ if (dispatcher != null
+ && (streams == null || !streams.contains(streamName))) {
+ dispatcher.dispatchEvent(streamName, compoundKeyNames, event);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/StreamSelectingDispatcher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/StreamSelectingDispatcher.java b/s4-core/src/main/java/org/apache/s4/dispatcher/StreamSelectingDispatcher.java
new file mode 100644
index 0000000..4ea7795
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/StreamSelectingDispatcher.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+public class StreamSelectingDispatcher implements EventDispatcher {
+
+ private EventDispatcher dispatcher = null;
+
+ public void setDispatcher(EventDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ private HashSet<String> streams = null;
+
+ public void setStreams(String[] streams) {
+ this.streams = new HashSet<String>(Arrays.asList(streams));
+ }
+
+ @Override
+ public void dispatchEvent(String streamName, Object event) {
+ if (dispatcher != null && streams != null
+ && streams.contains(streamName)) {
+ dispatcher.dispatchEvent(streamName, event);
+ }
+ }
+
+ @Override
+ public void dispatchEvent(String streamName,
+ List<List<String>> compoundKeyNames, Object event) {
+ if (dispatcher != null && streams != null
+ && streams.contains(streamName)) {
+ dispatcher.dispatchEvent(streamName, compoundKeyNames, event);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/BroadcastPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/BroadcastPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/BroadcastPartitioner.java
new file mode 100644
index 0000000..b3f23d3
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/BroadcastPartitioner.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Partition events to be sent to all parts. This can be used in conjunction
+ * with a Dispatcher to broadcast events.
+ */
+public class BroadcastPartitioner implements Partitioner {
+ @Override
+ public List<CompoundKeyInfo> partition(String streamName, Object event,
+ int partitionCount) {
+
+ List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+
+ // assign to all partitions
+ for (int i = 0; i < partitionCount; ++i) {
+ CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+
+ partitionInfo.setPartitionId(i);
+
+ partitionInfoList.add(partitionInfo);
+ }
+
+ return partitionInfoList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/CompoundKeyInfo.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/CompoundKeyInfo.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/CompoundKeyInfo.java
new file mode 100644
index 0000000..f7b68f8
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/CompoundKeyInfo.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CompoundKeyInfo implements Serializable {
+ List<KeyInfo> keyInfoList = new ArrayList<KeyInfo>();
+ int partitionId = -1;
+ String compoundValue;
+ String compoundKey;
+
+ public CompoundKeyInfo() {
+ }
+
+ public void addKeyInfo(KeyInfo keyInfo) {
+ keyInfoList.add(keyInfo);
+ }
+
+ public void setPartitionId(int partitionId) {
+ this.partitionId = partitionId;
+ }
+
+ public void setCompoundKey(String compoundKey) {
+ this.compoundKey = compoundKey;
+ }
+
+ public void setCompoundValue(String compoundValue) {
+ this.compoundValue = compoundValue;
+ }
+
+ public List<KeyInfo> getKeyInfoList() {
+ return keyInfoList;
+ }
+
+ public int getPartitionId() {
+ return partitionId;
+ }
+
+ public String getCompoundKey() {
+ if (compoundKey == null) {
+ StringBuffer compoundKeyBuffer = new StringBuffer();
+ for (KeyInfo keyInfo : this.getKeyInfoList()) {
+ if (compoundKeyBuffer.length() > 0) {
+ compoundKeyBuffer.append(",");
+ }
+ compoundKeyBuffer.append(keyInfo.toString());
+ }
+ compoundKey = compoundKeyBuffer.toString();
+ }
+ return compoundKey;
+ }
+
+ public String getCompoundValue() {
+ return this.compoundValue;
+ }
+
+ public String toString() {
+ return "{" + getCompoundKey() + " = " + getCompoundValue() + "}:"
+ + getPartitionId();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultHasher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultHasher.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultHasher.java
new file mode 100644
index 0000000..0076154
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultHasher.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+public class DefaultHasher implements Hasher {
+
+ HashAlgorithm hashAlgorithm = HashAlgorithm.FNV1_64_HASH;
+
+ @Override
+ public long hash(Object hashKey) {
+ return hashAlgorithm.hash(String.valueOf(hashKey));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java
new file mode 100644
index 0000000..ae2faca
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DefaultPartitioner.java
@@ -0,0 +1,351 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import org.apache.s4.schema.Schema;
+import org.apache.s4.schema.Schema.Property;
+import org.apache.s4.schema.SchemaContainer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+public class DefaultPartitioner implements Partitioner, VariableKeyPartitioner {
+ private List<List<String>> keyNameTuple = new ArrayList<List<String>>();
+ private boolean debug = false;
+ private Hasher hasher;
+ private Set<String> streamNameSet;
+ private String delimiter = ":";
+ private boolean fastPath = false;
+
+ public void setDelimiter(String delimiter) {
+ this.delimiter = delimiter;
+ }
+
+ public void setHashKey(String[] simpleKeyStrings) {
+ for (String simpleKeyAsString : simpleKeyStrings) {
+ List<String> keyNameElements = new ArrayList<String>();
+ StringTokenizer st = new StringTokenizer(simpleKeyAsString, "/");
+ while (st.hasMoreTokens()) {
+ keyNameElements.add(st.nextToken());
+ }
+ keyNameTuple.add(keyNameElements);
+ }
+ }
+
+ public void setStreamNames(String[] streamNames) {
+ streamNameSet = new HashSet<String>(streamNames.length);
+ for (String eventType : streamNames) {
+ streamNameSet.add(eventType);
+ }
+ }
+
+ public void setHasher(Hasher hasher) {
+ this.hasher = hasher;
+ }
+
+ public void setDebug(boolean debug) {
+ this.debug = debug;
+ }
+
+ private SchemaContainer schemaContainer = new SchemaContainer();
+
+ public List<CompoundKeyInfo> partition(String streamName, Object event,
+ int partitionCount) {
+ return partition(streamName, keyNameTuple, event, partitionCount);
+ }
+
+ public List<CompoundKeyInfo> partition(String streamName,
+ List<List<String>> compoundKeyNames,
+ Object event, int partitionCount) {
+
+ if (streamName != null && streamNameSet != null
+ && !streamNameSet.contains(streamName)) {
+ return null;
+ }
+
+ // Some event types that need special handling
+ if (event instanceof org.apache.s4.message.Request) {
+ // construct key from request's target
+ org.apache.s4.message.Request r = (io.s4.message.Request) event;
+ return r.partition(hasher, delimiter, partitionCount);
+
+ } else if (event instanceof org.apache.s4.message.Response) {
+ // partition id is encoded in Response, so use it directly.
+ org.apache.s4.message.Response r = (io.s4.message.Response) event;
+ return r.partition(partitionCount);
+
+ } else if (compoundKeyNames == null) {
+ // if compoundKeyNames is null, then assign to a random partition.
+ return partitionRandom(partitionCount);
+ }
+
+ // have to compute key value and
+ // partition based on hash of that value
+
+ Schema schema = schemaContainer.getSchema(event.getClass());
+
+ if (debug) {
+ System.out.println(schema);
+ }
+
+ List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+
+ // fast path for single top-level key
+ if (fastPath
+ || (compoundKeyNames.size() == 1 && compoundKeyNames.get(0)
+ .size() == 1)) {
+ String simpleKeyName = compoundKeyNames.get(0).get(0);
+ if (debug) {
+ System.out.println("Using fast path!");
+ }
+ fastPath = true;
+ KeyInfo keyInfo = new KeyInfo();
+ Property property = schema.getProperties().get(simpleKeyName);
+ if (property == null) {
+ return null;
+ }
+
+ Object value = null;
+ try {
+ value = property.getGetterMethod().invoke(event);
+ } catch (Exception e) {
+ if (debug) {
+ e.printStackTrace();
+ }
+ }
+
+ if (value == null) {
+ if (debug) {
+ System.out.println("Fast path: Null value encountered");
+ }
+ return null;
+ }
+ keyInfo.addElementToPath(simpleKeyName);
+ String stringValue = String.valueOf(value);
+ keyInfo.setValue(stringValue);
+ CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+ partitionInfo.addKeyInfo(keyInfo);
+ int partitionId = (int) (hasher.hash(stringValue) % partitionCount);
+ partitionInfo.setPartitionId(partitionId);
+ partitionInfo.setCompoundValue(stringValue);
+ partitionInfoList.add(partitionInfo);
+ if (debug) {
+ System.out.printf("Value %s, partition id %d\n",
+ stringValue,
+ partitionInfo.getPartitionId());
+ }
+ return partitionInfoList;
+ }
+
+ List<List<KeyInfo>> valueLists = new ArrayList<List<KeyInfo>>();
+ int maxSize = 0;
+
+ for (List<String> simpleKeyPath : compoundKeyNames) {
+ List<KeyInfo> keyInfoList = new ArrayList<KeyInfo>();
+ KeyInfo keyInfo = new KeyInfo();
+ keyInfoList = getKeyValues(event,
+ schema,
+ simpleKeyPath,
+ 0,
+ keyInfoList,
+ keyInfo);
+ if (keyInfoList == null || keyInfoList.size() == 0) {
+ if (debug) {
+ System.out.println("Null value encountered");
+ }
+ return null; // do no partitioning if any simple key's value
+ // resolves to null
+ }
+ valueLists.add(keyInfoList);
+ maxSize = Math.max(maxSize, keyInfoList.size());
+
+ if (debug) {
+ printKeyInfoList(keyInfoList);
+ }
+ }
+
+ for (int i = 0; i < maxSize; i++) {
+ String compoundValue = "";
+ CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+ for (List<KeyInfo> keyInfoList : valueLists) {
+ if (i < keyInfoList.size()) {
+ compoundValue += (compoundValue.length() > 0 ? delimiter
+ : "") + keyInfoList.get(i).getValue();
+ partitionInfo.addKeyInfo(keyInfoList.get(i));
+ } else {
+ compoundValue += (compoundValue.length() > 0 ? delimiter
+ : "")
+ + keyInfoList.get(keyInfoList.size() - 1)
+ .getValue();
+ partitionInfo.addKeyInfo(keyInfoList.get(keyInfoList.size() - 1));
+ }
+ }
+
+ // get the partition id
+ int partitionId = (int) (hasher.hash(compoundValue) % partitionCount);
+ partitionInfo.setPartitionId(partitionId);
+ partitionInfo.setCompoundValue(compoundValue);
+ partitionInfoList.add(partitionInfo);
+ if (debug) {
+ System.out.printf("Value %s, partition id %d\n",
+ compoundValue,
+ partitionInfo.getPartitionId());
+ }
+ }
+
+ return partitionInfoList;
+ }
+
+ // Assign to random partition
+ private List<CompoundKeyInfo> partitionRandom(int partitionCount) {
+ CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+
+ // choose a random int from [0, partitionCount-1]
+ int partitionId = (int) Math.min(partitionCount - 1,
+ Math.floor(Math.random()
+ * partitionCount));
+
+ partitionInfo.setPartitionId(partitionId);
+ List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+ partitionInfoList.add(partitionInfo);
+
+ return partitionInfoList;
+ }
+
+ private void printKeyInfoList(List<KeyInfo> keyInfoList) {
+ for (KeyInfo aKeyInfo : keyInfoList) {
+ System.out.printf("Path: %s; full path %s; value %s\n",
+ aKeyInfo.toString(),
+ aKeyInfo.toString(true),
+ aKeyInfo.getValue());
+ }
+ }
+
+ private List<KeyInfo> getKeyValues(Object record, Schema schema,
+ List<String> keyNameElements,
+ int elementIndex,
+ List<KeyInfo> keyInfoList,
+ KeyInfo keyInfo) {
+ String keyElement = keyNameElements.get(elementIndex);
+ Property property = schema.getProperties().get(keyElement);
+ if (property == null) {
+ return null;
+ }
+
+ keyInfo.addElementToPath(keyElement);
+
+ Object value = null;
+ try {
+ value = property.getGetterMethod().invoke(record);
+ } catch (Exception e) {
+ if (debug) {
+ System.out.println("key element is " + keyElement);
+ e.printStackTrace();
+ }
+ }
+
+ if (value == null) {
+ return null; // return a null KeyInfo list if we hit a null value
+ }
+ if (property.isList()) {
+ List list = (List) value;
+ // TODO: handle case where key does not include property of
+ // component type
+ Schema componentSchema = property.getComponentProperty()
+ .getSchema();
+ int listLength = list.size();
+ for (int i = 0; i < listLength; i++) {
+ Object listEntry = list.get(i);
+ KeyInfo keyInfoForListEntry = keyInfo.copy();
+ keyInfoForListEntry.addElementToPath(i);
+ Object partialList = getKeyValues(listEntry,
+ componentSchema,
+ keyNameElements,
+ elementIndex + 1,
+ keyInfoList,
+ keyInfoForListEntry);
+ if (partialList == null) {
+ return null;
+ }
+ }
+ } else if (property.getSchema() != null) {
+ return getKeyValues(value,
+ property.getSchema(),
+ keyNameElements,
+ elementIndex + 1,
+ keyInfoList,
+ keyInfo);
+ } else {
+ keyInfo.setValue(String.valueOf(value));
+ keyInfoList.add(keyInfo);
+ }
+
+ return keyInfoList;
+ }
+
+ public static void main(String args[]) {
+ DefaultPartitioner dp1 = new DefaultPartitioner();
+ DefaultPartitioner dp2 = new DefaultPartitioner();
+ dp1.setDebug(true);
+ dp1.setHashKey(new String[] { "array1/val1", "array1/val2", "query" });
+ dp1.setHasher(new DefaultHasher());
+
+ dp2.setDebug(true);
+ dp2.setHashKey(new String[] { "user" });
+ dp2.setHasher(new DefaultHasher());
+
+ Map<String, Object> event = new HashMap<String, Object>();
+ event.put("user", "fred");
+ event.put("query", "timex watch");
+ List<Map<String, Object>> array1 = new ArrayList<Map<String, Object>>();
+ Map<String, Object> element = new HashMap<String, Object>();
+ element.put("val1", new Long(72));
+ element.put("val2", new Long(11));
+ array1.add(element);
+ element = new HashMap<String, Object>();
+ element.put("val1", new Long(21));
+ element.put("val2", new Long(12));
+ array1.add(element);
+ event.put("array1", array1);
+
+ dp1.partition("test", event, 4);
+ System.out.println("------------");
+ dp2.partition("test", event, 4);
+ System.out.println("------------");
+ event = new HashMap<String, Object>();
+
+ event.put("query", "timex watch");
+ array1 = new ArrayList<Map<String, Object>>();
+ element = new HashMap<String, Object>();
+ element.put("val1", new Long(72));
+ element.put("val2", new Long(11));
+ array1.add(element);
+ element = new HashMap<String, Object>();
+
+ element.put("val2", new Long(12));
+ array1.add(element);
+ event.put("array1", array1);
+
+ dp1.partition("test", event, 4);
+ System.out.println("------------");
+ dp2.partition("test", event, 4);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DummyPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DummyPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DummyPartitioner.java
new file mode 100644
index 0000000..5384ecd
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/DummyPartitioner.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DummyPartitioner implements Partitioner {
+
+ @Override
+ public List<CompoundKeyInfo> partition(String streamName, Object event, int partitionCount) {
+ CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+ partitionInfo.setPartitionId(0);
+ List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+ partitionInfoList.add(partitionInfo);
+
+ return partitionInfoList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/HashAlgorithm.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/HashAlgorithm.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/HashAlgorithm.java
new file mode 100644
index 0000000..1704431
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/HashAlgorithm.java
@@ -0,0 +1,160 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import org.apache.s4.util.KeyUtil;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.zip.CRC32;
+
+/** from
+ * http://github.com/dustin/java-memcached-client/blob/master/src/main/java/net/spy/memcached/HashAlgorithm.java
+ */
+
+/**
+ * Known hashing algorithms for locating a server for a key. Note that all hash
+ * algorithms return 64-bits of hash, but only the lower 32-bits are
+ * significant. This allows a positive 32-bit number to be returned for all
+ * cases.
+ */
+public enum HashAlgorithm {
+
+ /**
+ * Native hash (String.hashCode()).
+ */
+ NATIVE_HASH,
+ /**
+ * CRC32_HASH as used by the perl API. This will be more consistent both
+ * across multiple API users as well as java versions, but is mostly likely
+ * significantly slower.
+ */
+ CRC32_HASH,
+ /**
+ * FNV hashes are designed to be fast while maintaining a low collision
+ * rate. The FNV speed allows one to quickly hash lots of data while
+ * maintaining a reasonable collision rate.
+ *
+ * @see <a href="http://www.isthe.com/chongo/tech/comp/fnv/">fnv
+ * comparisons</a>
+ * @see <a href="http://en.wikipedia.org/wiki/Fowler_Noll_Vo_hash">fnv at
+ * wikipedia</a>
+ */
+ FNV1_64_HASH,
+ /**
+ * Variation of FNV.
+ */
+ FNV1A_64_HASH,
+ /**
+ * 32-bit FNV1.
+ */
+ FNV1_32_HASH,
+ /**
+ * 32-bit FNV1a.
+ */
+ FNV1A_32_HASH,
+ /**
+ * MD5-based hash algorithm used by ketama.
+ */
+ KETAMA_HASH;
+
+ private static final long FNV_64_INIT = 0xcbf29ce484222325L;
+ private static final long FNV_64_PRIME = 0x100000001b3L;
+
+ private static final long FNV_32_INIT = 2166136261L;
+ private static final long FNV_32_PRIME = 16777619;
+
+ /**
+ * Compute the hash for the given key.
+ *
+ * @return a positive integer hash
+ */
+ public long hash(final String k) {
+ long rv = 0;
+ switch (this) {
+ case NATIVE_HASH:
+ rv = k.hashCode();
+ break;
+ case CRC32_HASH:
+ // return (crc32(shift) >> 16) & 0x7fff;
+ CRC32 crc32 = new CRC32();
+ crc32.update(KeyUtil.getKeyBytes(k));
+ rv = (crc32.getValue() >> 16) & 0x7fff;
+ break;
+ case FNV1_64_HASH: {
+ // Thanks to pierre@demartines.com for the pointer
+ rv = FNV_64_INIT;
+ int len = k.length();
+ for (int i = 0; i < len; i++) {
+ rv *= FNV_64_PRIME;
+ rv ^= k.charAt(i);
+ }
+ }
+ break;
+ case FNV1A_64_HASH: {
+ rv = FNV_64_INIT;
+ int len = k.length();
+ for (int i = 0; i < len; i++) {
+ rv ^= k.charAt(i);
+ rv *= FNV_64_PRIME;
+ }
+ }
+ break;
+ case FNV1_32_HASH: {
+ rv = FNV_32_INIT;
+ int len = k.length();
+ for (int i = 0; i < len; i++) {
+ rv *= FNV_32_PRIME;
+ rv ^= k.charAt(i);
+ }
+ }
+ break;
+ case FNV1A_32_HASH: {
+ rv = FNV_32_INIT;
+ int len = k.length();
+ for (int i = 0; i < len; i++) {
+ rv ^= k.charAt(i);
+ rv *= FNV_32_PRIME;
+ }
+ }
+ break;
+ case KETAMA_HASH:
+ byte[] bKey = computeMd5(k);
+ rv = ((long) (bKey[3] & 0xFF) << 24)
+ | ((long) (bKey[2] & 0xFF) << 16)
+ | ((long) (bKey[1] & 0xFF) << 8) | (bKey[0] & 0xFF);
+ break;
+ default:
+ assert false;
+ }
+ return rv & 0xffffffffL; /* Truncate to 32-bits */
+ }
+
+ /**
+ * Get the md5 of the given key.
+ */
+ public static byte[] computeMd5(String k) {
+ MessageDigest md5;
+ try {
+ md5 = MessageDigest.getInstance("MD5");
+ } catch (NoSuchAlgorithmException e) {
+ throw new RuntimeException("MD5 not supported", e);
+ }
+ md5.reset();
+ md5.update(KeyUtil.getKeyBytes(k));
+ return md5.digest();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Hasher.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Hasher.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Hasher.java
new file mode 100644
index 0000000..863a34c
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Hasher.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+public interface Hasher {
+ public long hash(Object hashKey);
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/KeyInfo.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/KeyInfo.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/KeyInfo.java
new file mode 100644
index 0000000..ebdfc5b
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/KeyInfo.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class KeyInfo implements Serializable {
+ List<KeyPathElement> keyPath = new ArrayList<KeyPathElement>();
+ String value;
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return this.value;
+ }
+
+ public void addElementToPath(String keyName) {
+ keyPath.add(new KeyPathElementName(keyName));
+ }
+
+ public void addElementToPath(int index) {
+ keyPath.add(new KeyPathElementIndex(index));
+ }
+
+ private void addElementToPath(KeyPathElement keyPathElement) {
+ keyPath.add(keyPathElement);
+ }
+
+ public List<KeyPathElement> getKeyPath() {
+ return keyPath;
+ }
+
+ public static class KeyPathElement implements Serializable {
+ public enum PathElementType {
+ KEY_NAME, INDEX
+ }
+
+ PathElementType pathElementType;
+
+ public PathElementType getPathElementType() {
+ return pathElementType;
+ }
+
+ }
+
+ public static class KeyPathElementName extends KeyPathElement {
+ String keyName;
+
+ public KeyPathElementName() {
+
+ }
+
+ public KeyPathElementName(String keyName) {
+ pathElementType = PathElementType.KEY_NAME;
+ this.keyName = keyName;
+ }
+
+ public String getKeyName() {
+ return keyName;
+ }
+ }
+
+ public static class KeyPathElementIndex extends KeyPathElement {
+ int index;
+
+ public KeyPathElementIndex() {
+
+ }
+
+ public KeyPathElementIndex(int index) {
+ pathElementType = PathElementType.INDEX;
+ this.index = index;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+ }
+
+ public KeyInfo copy() {
+ KeyInfo newKeyInfo = new KeyInfo();
+ for (KeyPathElement element : keyPath) {
+ newKeyInfo.addElementToPath(element);
+ }
+ return newKeyInfo;
+ }
+
+ public String toString() {
+ return toString(false);
+ }
+
+ public String toString(boolean showFull) {
+ StringBuffer sb = new StringBuffer();
+ for (KeyPathElement element : keyPath) {
+ if (element.getPathElementType() == KeyPathElement.PathElementType.KEY_NAME) {
+ if (sb.length() > 0) {
+ sb.append("/");
+ }
+ sb.append(((KeyPathElementName) element).getKeyName());
+ } else if (showFull) {
+ sb.append("[")
+ .append(((KeyPathElementIndex) element).getIndex())
+ .append("]");
+ }
+ }
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/LoopbackPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/LoopbackPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/LoopbackPartitioner.java
new file mode 100644
index 0000000..a527713
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/LoopbackPartitioner.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import org.apache.s4.emitter.CommLayerEmitter;
+import org.apache.s4.emitter.EventEmitter;
+import org.apache.s4.listener.EventListener;
+import org.apache.s4.processor.PEContainer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A partitioner that assigns events to the current partition, as given by the comm layer.
+ *
+ */
+public class LoopbackPartitioner implements Partitioner, VariableKeyPartitioner {
+
+ CommLayerEmitter emitter;
+
+ @Override
+ public List<CompoundKeyInfo> partition(String streamName,
+ List<List<String>> compoundKeyNames, Object event,
+ int partitionCount) {
+ List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+ CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+ StringBuilder compoundKeyBuilder = new StringBuilder();
+ // This partitioning ignores the values of the keyed attributes;
+ // it partitions to the current partition id of the pe container
+ partitionInfo.setPartitionId(emitter.getListener().getId());
+ for (List<String> keyNames : compoundKeyNames) {
+ for (String keyName : keyNames) {
+ compoundKeyBuilder.append(keyName);
+ }
+ }
+ partitionInfo.setCompoundKey(compoundKeyBuilder.toString());
+ partitionInfoList.add(partitionInfo);
+ return partitionInfoList;
+ }
+
+ @Override
+ public List<CompoundKeyInfo> partition(String streamName, Object event,
+ int partitionCount) {
+ return partition(streamName, new ArrayList<List<String>>(0), event,
+ partitionCount);
+ }
+
+ /**
+ * A reference on the emitter allows getting the current partition id from the comm layer
+ * @param emitter comm layer emitter
+ */
+ public void setEventEmitter(CommLayerEmitter emitter) {
+ this.emitter = emitter;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Partitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Partitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Partitioner.java
new file mode 100644
index 0000000..cc5a1f3
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/Partitioner.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.util.List;
+
+public interface Partitioner {
+ public List<CompoundKeyInfo> partition(String streamName, Object event, int partitionCount);
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/RoundRobinPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/RoundRobinPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/RoundRobinPartitioner.java
new file mode 100644
index 0000000..eb4fc04
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/RoundRobinPartitioner.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class RoundRobinPartitioner implements Partitioner {
+ private int counter = 0;
+ private Set<String> streamNameSet;
+
+ public void setStreamNames(String[] streamNames) {
+ streamNameSet = new HashSet<String>(streamNames.length);
+ for (String eventType : streamNames) {
+ streamNameSet.add(eventType);
+ }
+ }
+
+ @Override
+ public List<CompoundKeyInfo> partition(String streamName, Object event,
+ int partitionCount) {
+
+ if (streamName != null && streamNameSet != null
+ && !streamNameSet.contains(streamName)) {
+ return null;
+ }
+
+ CompoundKeyInfo partitionInfo = new CompoundKeyInfo();
+ int partitionId = 0;
+
+ synchronized (this) {
+ counter++;
+ if (counter < 0) {
+ counter = 0;
+ }
+ partitionId = counter % partitionCount;
+ }
+
+ partitionInfo.setPartitionId(partitionId);
+ List<CompoundKeyInfo> partitionInfoList = new ArrayList<CompoundKeyInfo>();
+ partitionInfoList.add(partitionInfo);
+
+ return partitionInfoList;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/TestDefaultPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/TestDefaultPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/TestDefaultPartitioner.java
new file mode 100644
index 0000000..b6f1d31
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/TestDefaultPartitioner.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestDefaultPartitioner {
+ public static void main(String[] args) {
+ DefaultPartitioner dp1 = new DefaultPartitioner();
+ DefaultPartitioner dp2 = new DefaultPartitioner();
+ dp1.setDebug(true);
+ dp1.setHashKey(new String[] { "list1/val1", "list1/val2", "query" });
+ dp1.setHasher(new DefaultHasher());
+
+ dp2.setDebug(true);
+ dp2.setHashKey(new String[] { "user" });
+ dp2.setHasher(new DefaultHasher());
+
+ TopLevel tl1 = new TopLevel();
+ tl1.setQuery("Hello there");
+ tl1.setUser("spitzer");
+
+ for (int i = 0; i < 4; i++) {
+ Nested n = new Nested();
+ n.setVal1(i + 77);
+ n.setVal2(i / 10.7);
+ tl1.addNested(n);
+ }
+
+ dp1.partition("test", tl1, 4);
+ dp2.partition("test", tl1, 4);
+
+ }
+
+ static class TopLevel {
+ private String query;
+ private List<Nested> list1 = new ArrayList<Nested>();
+ private String user;
+
+ public String getQuery() {
+ return query;
+ }
+
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+ public List<Nested> getList1() {
+ return list1;
+ }
+
+ public void setList1(List<Nested> list1) {
+ this.list1 = list1;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public void addNested(Nested nested) {
+ list1.add(nested);
+ }
+ }
+
+ static class Nested {
+ long val1;
+ double val2;
+
+ public Nested() {
+
+ }
+
+ public long getVal1() {
+ return val1;
+ }
+
+ public void setVal1(long val1) {
+ this.val1 = val1;
+ }
+
+ public double getVal2() {
+ return val2;
+ }
+
+ public void setVal2(double val2) {
+ this.val2 = val2;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/VariableKeyPartitioner.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/VariableKeyPartitioner.java b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/VariableKeyPartitioner.java
new file mode 100644
index 0000000..722de91
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/partitioner/VariableKeyPartitioner.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher.partitioner;
+
+import java.util.List;
+
+public interface VariableKeyPartitioner {
+ public List<CompoundKeyInfo> partition(String streamName,
+ List<List<String>> keyNames,
+ Object event, int partitionCount);
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/dispatcher/transformer/Transformer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/dispatcher/transformer/Transformer.java b/s4-core/src/main/java/org/apache/s4/dispatcher/transformer/Transformer.java
new file mode 100644
index 0000000..17cff66
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/dispatcher/transformer/Transformer.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.dispatcher.transformer;
+
+
+public interface Transformer {
+ public Object transform(Object event);
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/emitter/CommLayerEmitter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/emitter/CommLayerEmitter.java b/s4-core/src/main/java/org/apache/s4/emitter/CommLayerEmitter.java
new file mode 100644
index 0000000..155f933
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/emitter/CommLayerEmitter.java
@@ -0,0 +1,261 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.emitter;
+
+import static org.apache.s4.util.MetricsName.S4_CORE_METRICS;
+import static org.apache.s4.util.MetricsName.S4_EVENT_METRICS;
+import static org.apache.s4.util.MetricsName.low_level_emitter_msg_out_ct;
+import static org.apache.s4.util.MetricsName.low_level_emitter_out_err_ct;
+import static org.apache.s4.util.MetricsName.low_level_emitter_qsz;
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.comm.core.SenderProcess;
+import org.apache.s4.comm.core.Serializer;
+import org.apache.s4.listener.CommLayerListener;
+import org.apache.s4.logger.Monitor;
+import org.apache.s4.message.Request;
+import org.apache.s4.serialize.SerializerDeserializer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.apache.log4j.Logger;
+
+public class CommLayerEmitter implements EventEmitter, Runnable {
+ private static Logger logger = Logger.getLogger(CommLayerEmitter.class);
+
+ // config for emitter is coupled with listener, if there is a non-null
+ // listener. This prevents 2 tasks from being checked out of the cluster
+ // config for the same node: one for the listener and another for the
+ // emitter.
+ private CommLayerListener listener;
+
+ private SenderProcess sender;
+ private int nodeCount;
+ private BlockingQueue<MessageHolder> messageQueue = new LinkedBlockingDeque<MessageHolder>();
+ private String senderId;
+ private String clusterManagerAddress;
+ private String appName;
+ private String listenerAppName = null;
+ private Monitor monitor;
+ private SerializerDeserializer serDeser;
+
+ public void setSerDeser(SerializerDeserializer serDeser) {
+ this.serDeser = serDeser;
+ }
+
+ public void setMonitor(Monitor monitor) {
+ this.monitor = monitor;
+ }
+
+ public void setSenderId(String senderId) {
+ this.senderId = senderId;
+ }
+
+ public void setClusterManagerAddress(String clusterManagerAddress) {
+ this.clusterManagerAddress = clusterManagerAddress;
+ }
+
+ public void setAppName(String appName) {
+ this.appName = appName;
+ }
+
+ public void setListenerAppName(String listenerAppName) {
+ this.listenerAppName = listenerAppName;
+ }
+
+ public void setListener(CommLayerListener listener) {
+ this.listener = listener;
+ }
+
+ public CommLayerListener getListener() {
+ return this.listener;
+ }
+
+ public void init() {
+
+ Thread t = new Thread(this, "CommLayerEmitter");
+ t.start();
+ }
+
+ public void queueMessage(MessageHolder messageHolder) {
+ messageQueue.add(messageHolder);
+ try {
+ if (monitor != null) {
+ monitor.set(low_level_emitter_qsz.toString(),
+ messageQueue.size(),
+ S4_CORE_METRICS.toString());
+ }
+ } catch (Exception e) {
+ logger.error("Exception in monitor metrics on thread "
+ + Thread.currentThread().getId(), e);
+ }
+ }
+
+ @Override
+ public void emit(int partitionId, EventWrapper eventWrapper) {
+
+ // Special handling required for Requests
+ if (eventWrapper.getEvent() instanceof Request) {
+ decorateRequest((Request) eventWrapper.getEvent());
+ }
+
+ try {
+ MessageHolder mh = new MessageHolder(partitionId, eventWrapper);
+ queueMessage(mh);
+ } catch (RuntimeException rte) {
+ if (monitor != null) {
+ monitor.increment(low_level_emitter_out_err_ct.toString(),
+ 1,
+ S4_EVENT_METRICS.toString(),
+ "et",
+ eventWrapper.getStreamName());
+ }
+ Logger.getLogger("s4").error("Error serializing or emitting event "
+ + eventWrapper.getEvent(),
+ rte);
+ throw rte;
+ }
+ }
+
+ // Add partition id of sender
+ private void decorateRequest(Request r) {
+ Request.RInfo rinfo = r.getRInfo();
+
+ if (rinfo != null && listener != null)
+ rinfo.setPartition(listener.getId());
+ }
+
+ @Override
+ public int getNodeCount() {
+ if (listener == null) {
+ return 1;
+ }
+ return nodeCount;
+ }
+
+ @Override
+ public void run() {
+ if (listener == null) {
+ if (listenerAppName == null) {
+ listenerAppName = appName;
+ }
+ sender = new SenderProcess(clusterManagerAddress,
+ appName,
+ listenerAppName);
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("SenderId", "" + senderId);
+ sender.setSerializer(new PassThroughSerializer());
+ sender.acquireTaskAndCreateSender(map);
+ } else {
+ Object listenerConfig = null;
+ try {
+ listenerConfig = listener.getListenerConfig();
+ if (listenerConfig == null) {
+ logger.info("CommLayerEmitter going to wait for listener to acquire task");
+ synchronized (listener) {
+ listenerConfig = listener.getListenerConfig();
+ if (listenerConfig == null) {
+ listener.wait();
+ listenerConfig = listener.getListenerConfig();
+ }
+ }
+ }
+ } catch (Exception e) {
+ logger.info("Exception in CommLayerEmitter.run()", e);
+ }
+ logger.info("Creating sender process with " + listenerConfig);
+
+ String destinationAppName = (listenerAppName != null
+ ? listenerAppName : listener.getAppName());
+
+ sender = new SenderProcess(listener.getClusterManagerAddress(),
+ listener.getAppName(),
+ destinationAppName);
+
+ sender.setSerializer(new PassThroughSerializer());
+ sender.createSenderFromConfig(listenerConfig);
+ nodeCount = sender.getNumOfPartitions();
+ }
+ boolean isSent = false;
+ while (!Thread.interrupted()) {
+ isSent = false;
+ try {
+ MessageHolder mh = messageQueue.take();
+ byte[] rawMessage = serDeser.serialize(mh.getEventWrapper());
+ if (listener == null) {
+ isSent = sender.send(rawMessage);
+ } else {
+ isSent = sender.sendToPartition(mh.getPartitionId(),
+ rawMessage);
+ }
+
+ if (isSent) {
+ if (monitor != null) {
+ monitor.increment(low_level_emitter_msg_out_ct.toString(),
+ 1,
+ S4_CORE_METRICS.toString());
+ }
+ } else {
+ if (monitor != null) {
+ monitor.increment(low_level_emitter_out_err_ct.toString(),
+ 1,
+ S4_CORE_METRICS.toString());
+ }
+ logger.warn("commlayer emit failed ...");
+ }
+ } catch (InterruptedException ie) {
+ if (monitor != null) {
+ monitor.increment(low_level_emitter_out_err_ct.toString(),
+ 1,
+ S4_CORE_METRICS.toString());
+ }
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ Logger.getLogger("s4").error("Error emitting message", e);
+ }
+ }
+ }
+
+ public class PassThroughSerializer implements Serializer {
+ public byte[] serialize(Object input) {
+ if (input instanceof byte[]) {
+ return (byte[]) input;
+ } else {
+ return null;
+ }
+ }
+ }
+
+ class MessageHolder {
+ private int partitionId;
+ private EventWrapper eventWrapper;
+
+ MessageHolder(int partitionId, EventWrapper eventWrapper) {
+ this.partitionId = partitionId;
+ this.eventWrapper = eventWrapper;
+ }
+
+ int getPartitionId() {
+ return partitionId;
+ }
+
+ EventWrapper getEventWrapper() {
+ return eventWrapper;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/emitter/EventEmitter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/emitter/EventEmitter.java b/s4-core/src/main/java/org/apache/s4/emitter/EventEmitter.java
new file mode 100644
index 0000000..715eb2e
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/emitter/EventEmitter.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.emitter;
+
+import org.apache.s4.collector.EventWrapper;
+
+public interface EventEmitter {
+ public void emit(int partitionId, EventWrapper eventWrapper);
+
+ public int getNodeCount();
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/CheckpointingEvent.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/CheckpointingEvent.java b/s4-core/src/main/java/org/apache/s4/ft/CheckpointingEvent.java
new file mode 100644
index 0000000..f5ee6b2
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/CheckpointingEvent.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.ft;
+
+/**
+ *
+ * <p>
+ * This class defines a checkpointing event (either a request for checkpoint or for recovery).
+ * </p>
+ * <p>
+ * Checkpointing events are queued in the PE event queue and processed according the the PE processor scheduler (FIFO).
+ * </p>
+ */
+public abstract class CheckpointingEvent {
+
+ private SafeKeeperId safeKeeperId;
+
+ /**
+ * This is a requirement of the serialization framework
+ */
+ public CheckpointingEvent() {
+ }
+
+ /**
+ * Constructor identifying the PE subject to checkpointing or recovery
+ * @param safeKeeperId safeKeeperId
+ */
+ public CheckpointingEvent(SafeKeeperId safeKeeperId) {
+ this.safeKeeperId = safeKeeperId;
+ }
+
+ public SafeKeeperId getSafeKeeperId() {
+ return safeKeeperId;
+ }
+
+ public void setSafeKeeperId(SafeKeeperId id) {
+ this.safeKeeperId = id;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/ft/DefaultFileSystemStateStorage.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/DefaultFileSystemStateStorage.java b/s4-core/src/main/java/org/apache/s4/ft/DefaultFileSystemStateStorage.java
new file mode 100644
index 0000000..7362af8
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/DefaultFileSystemStateStorage.java
@@ -0,0 +1,242 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.ft;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.log4j.Logger;
+
+/**
+ * <p>
+ * Implementation of a file system backend storage to persist checkpoints.
+ * </p>
+ * <p>
+ * The file system may be the default local file system when running on a single
+ * machine, but should be a distributed file system such as NFS when running on
+ * a cluster.
+ * </p>
+ * <p>
+ * Checkpoints are stored in individual files (1 file = 1 safeKeeperId) in
+ * directories according to the following structure:
+ * <code>(storageRootpath)/prototypeId/safeKeeperId</code>
+ * </p>
+ *
+ */
+public class DefaultFileSystemStateStorage implements StateStorage {
+
+ private static org.apache.log4j.Logger logger = Logger.getLogger(DefaultFileSystemStateStorage.class);
+ private String storageRootPath;
+
+ public DefaultFileSystemStateStorage() {
+ }
+
+ /**
+ * <p>
+ * Called by the dependency injection framework, after construction.
+ * <p/>
+ */
+ public void init() {
+ checkStorageDir();
+ }
+
+ @Override
+ public byte[] fetchState(SafeKeeperId key) {
+ File file = safeKeeperID2File(key, storageRootPath);
+ if (file != null && file.exists()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Fetching " + file.getAbsolutePath() + "for : " + key);
+ }
+ // TODO use commons-io or guava
+ FileInputStream in = null;
+ try {
+ in = new FileInputStream(file);
+
+ long length = file.length();
+
+ /*
+ * Arrays can only be created using int types, so ensure that
+ * the file size is not too big before we downcast to create the
+ * array.
+ */
+ if (length > Integer.MAX_VALUE) {
+ throw new IOException("Error file is too large: " + file.getName() + " " + length + " bytes");
+ }
+
+ byte[] buffer = new byte[(int) length];
+ int offSet = 0;
+ int numRead = 0;
+
+ while (offSet < buffer.length && (numRead = in.read(buffer, offSet, buffer.length - offSet)) >= 0) {
+ offSet += numRead;
+ }
+
+ if (offSet < buffer.length) {
+ throw new IOException("Error, could not read entire file: " + file.getName() + " " + offSet + "/"
+ + buffer.length + " bytes read");
+ }
+
+ in.close();
+ return buffer;
+ } catch (FileNotFoundException e1) {
+ logger.error(e1.getMessage(), e1);
+ } catch (IOException e2) {
+ logger.error(e2.getMessage(), e2);
+ } finally {
+ if (in != null) {
+ try {
+ in.close();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ }
+ }
+ return null;
+
+ }
+
+ @Override
+ public Set<SafeKeeperId> fetchStoredKeys() {
+ Set<SafeKeeperId> keys = new HashSet<SafeKeeperId>();
+ File rootDir = new File(storageRootPath);
+ File[] dirs = rootDir.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return file.isDirectory();
+ }
+ });
+ for (File dir : dirs) {
+ File[] files = dir.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return (file.isFile());
+ }
+ });
+ for (File file : files) {
+ keys.add(file2SafeKeeperID(file));
+ }
+ }
+ return keys;
+ }
+
+ // files kept as : root/<prototypeId>/encodedKeyWithFullInfo
+ private static File safeKeeperID2File(SafeKeeperId key, String storageRootPath) {
+
+ return new File(storageRootPath + File.separator + key.getPrototypeId() + File.separator
+ + Base64.encodeBase64URLSafeString(key.getStringRepresentation().getBytes()));
+ }
+
+ private static SafeKeeperId file2SafeKeeperID(File file) {
+ SafeKeeperId id = null;
+ id = new SafeKeeperId(new String(Base64.decodeBase64(file.getName())));
+ return id;
+ }
+
+ public String getStorageRootPath() {
+ return storageRootPath;
+ }
+
+ public void setStorageRootPath(String storageRootPath) {
+ this.storageRootPath = storageRootPath;
+ File rootPathFile = new File(storageRootPath);
+ if (!rootPathFile.exists()) {
+ if (!rootPathFile.mkdirs()) {
+ logger.error("could not create root storage directory : " + storageRootPath);
+ }
+
+ }
+ }
+
+ public void checkStorageDir() {
+ if (storageRootPath == null) {
+
+ File defaultStorageDir = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator
+ + "storage");
+ storageRootPath = defaultStorageDir.getAbsolutePath();
+ if (logger.isInfoEnabled()) {
+ logger.info("Unspecified storage dir; using default dir: " + defaultStorageDir.getAbsolutePath());
+ }
+ if (!defaultStorageDir.exists()) {
+ if (!(defaultStorageDir.mkdirs())) {
+ logger.error("Storage directory not specified, and cannot create default storage directory : "
+ + defaultStorageDir.getAbsolutePath() + "\n Checkpointing and recovery will be disabled.");
+ }
+ }
+ }
+ }
+
+ @Override
+ public void saveState(SafeKeeperId key, byte[] state, StorageCallback callback) {
+ File f = safeKeeperID2File(key, storageRootPath);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Checkpointing [" + key + "] into file: [" + f.getAbsolutePath() + "]");
+ }
+ if (!f.exists()) {
+ if (!f.getParentFile().exists()) {
+ // parent file has prototype id
+ if (!f.getParentFile().mkdir()) {
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
+ "Cannot create directory for storing PE [" + key.toString() + "] for prototype: "
+ + f.getParentFile().getAbsolutePath());
+ return ;
+ }
+ }
+ // TODO handle IO exception
+ try {
+ f.createNewFile();
+ } catch (IOException e) {
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
+ key.toString() + " : " + e.getMessage());
+ return ;
+ }
+ } else {
+ if (!f.delete()) {
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
+ "Cannot delete previously saved checkpoint file [" + f.getParentFile().getAbsolutePath() + "]");
+ return ;
+ }
+ }
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(f);
+ fos.write(state);
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.SUCCESS, key.toString());
+ } catch (FileNotFoundException e) {
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
+ key.toString() + " : " + e.getMessage());
+ } catch (IOException e) {
+ callback.storageOperationResult(SafeKeeper.StorageResultCode.FAILURE,
+ key.toString() + " : " + e.getMessage());
+ } finally {
+ try {
+ if (fos != null) {
+ fos.close();
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+ }
+ }
+
+}