You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:16 UTC
[35/50] [abbrv] Rename packages in preparation for move to Apache
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/adapter/Adapter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/adapter/Adapter.java b/s4-core/src/main/java/org/apache/s4/adapter/Adapter.java
new file mode 100644
index 0000000..8b57721
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/adapter/Adapter.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.adapter;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.dispatcher.EventDispatcher;
+import org.apache.s4.listener.EventHandler;
+import org.apache.s4.listener.EventListener;
+import org.apache.s4.listener.EventProducer;
+import org.apache.s4.util.S4Util;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.log4j.Logger;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.FileSystemXmlApplicationContext;
+
+public class Adapter implements EventHandler {
+ private static String coreHome = "../s4_core";
+
+ private EventDispatcher dispatcher;
+ private EventProducer[] eventListeners;
+ private String configFilename;
+
+ public void setDispatcher(EventDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public void setEventListeners(EventProducer[] eventListeners) {
+ this.eventListeners = eventListeners;
+ for (EventProducer eventListener : eventListeners) {
+ eventListener.addHandler(this);
+ }
+ }
+
+ public void setConfigFilename(String configFilename) {
+ this.configFilename = configFilename;
+ }
+
+ private volatile int eventCount = 0;
+ private volatile int rawEventCount = 0;
+
+ public Adapter() {
+
+ }
+
+ int counts[];
+
+ private boolean init = false;
+
+ public void init() {
+ synchronized (this) {
+ init = true;
+ }
+ }
+
+ public void processEvent(EventWrapper eventWrapper) {
+ try {
+ synchronized (this) {
+ if (!init) {
+ return;
+ }
+ rawEventCount++;
+ eventCount++;
+ }
+ dispatcher.dispatchEvent(eventWrapper.getStreamName(),
+ eventWrapper.getEvent());
+ } catch (Exception e) {
+ Logger.getLogger("dispatcher").info("Exception adapting event", e);
+ }
+ }
+
+ public static void main(String args[]) {
+ Options options = new Options();
+
+ options.addOption(OptionBuilder.withArgName("corehome")
+ .hasArg()
+ .withDescription("core home")
+ .create("c"));
+
+ options.addOption(OptionBuilder.withArgName("instanceid")
+ .hasArg()
+ .withDescription("instance id")
+ .create("i"));
+
+ options.addOption(OptionBuilder.withArgName("configtype")
+ .hasArg()
+ .withDescription("configuration type")
+ .create("t"));
+
+ options.addOption(OptionBuilder.withArgName("userconfig")
+ .hasArg()
+ .withDescription("user-defined legacy data adapter configuration file")
+ .create("d"));
+
+ CommandLineParser parser = new GnuParser();
+ CommandLine commandLine = null;
+
+ try {
+ commandLine = parser.parse(options, args);
+ } catch (ParseException pe) {
+ System.err.println(pe.getLocalizedMessage());
+ System.exit(1);
+ }
+
+ int instanceId = -1;
+ if (commandLine.hasOption("i")) {
+ String instanceIdStr = commandLine.getOptionValue("i");
+ try {
+ instanceId = Integer.parseInt(instanceIdStr);
+ } catch (NumberFormatException nfe) {
+ System.err.println("Bad instance id: %s" + instanceIdStr);
+ System.exit(1);
+ }
+ }
+
+ if (commandLine.hasOption("c")) {
+ coreHome = commandLine.getOptionValue("c");
+ }
+
+ String configType = "typical";
+ if (commandLine.hasOption("t")) {
+ configType = commandLine.getOptionValue("t");
+ }
+
+ String userConfigFilename = null;
+ if (commandLine.hasOption("d")) {
+ userConfigFilename = commandLine.getOptionValue("d");
+ }
+
+ File userConfigFile = new File(userConfigFilename);
+ if (!userConfigFile.isFile()) {
+ System.err.println("Bad user configuration file: "
+ + userConfigFilename);
+ System.exit(1);
+ }
+
+ File coreHomeFile = new File(coreHome);
+ if (!coreHomeFile.isDirectory()) {
+ System.err.println("Bad core home: " + coreHome);
+ System.exit(1);
+ }
+
+ if (instanceId > -1) {
+ System.setProperty("instanceId", "" + instanceId);
+ } else {
+ System.setProperty("instanceId", "" + S4Util.getPID());
+ }
+
+ String configBase = coreHome + File.separatorChar + "conf"
+ + File.separatorChar + configType;
+ String configPath = configBase + File.separatorChar
+ + "adapter-conf.xml";
+ File configFile = new File(configPath);
+ if (!configFile.exists()) {
+ System.err.printf("adapter config file %s does not exist\n",
+ configPath);
+ System.exit(1);
+ }
+
+ // load adapter config xml
+ ApplicationContext coreContext;
+ coreContext = new FileSystemXmlApplicationContext("file:" + configPath);
+ ApplicationContext context = coreContext;
+
+ Adapter adapter = (Adapter) context.getBean("adapter");
+
+ ApplicationContext appContext = new FileSystemXmlApplicationContext(new String[] { "file:"
+ + userConfigFilename },
+ context);
+ Map listenerBeanMap = appContext.getBeansOfType(EventProducer.class);
+ if (listenerBeanMap.size() == 0) {
+ System.err.println("No user-defined listener beans");
+ System.exit(1);
+ }
+ EventProducer[] eventListeners = new EventProducer[listenerBeanMap.size()];
+
+ int index = 0;
+ for (Iterator it = listenerBeanMap.keySet().iterator(); it.hasNext(); index++) {
+ String beanName = (String) it.next();
+ System.out.println("Adding producer " + beanName);
+ eventListeners[index] = (EventProducer) listenerBeanMap.get(beanName);
+ }
+
+ adapter.setEventListeners(eventListeners);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/Adapter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/Adapter.java b/s4-core/src/main/java/org/apache/s4/client/Adapter.java
new file mode 100644
index 0000000..19a024b
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/Adapter.java
@@ -0,0 +1,432 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.client;
+
+import org.apache.s4.collector.EventListener;
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.dispatcher.EventDispatcher;
+import org.apache.s4.listener.EventHandler;
+import org.apache.s4.message.Request;
+import org.apache.s4.processor.AsynchronousEventProcessor;
+import org.apache.s4.util.S4Util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Logger;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.FileSystemXmlApplicationContext;
+
+/**
+ * Adapter to connect an S4 cluster to an external client to read from and write
+ * into the S4 cluster.
+ */
+public class Adapter {
+ private static String coreHome = "../s4_core";
+
+ // Accept events from clients and send into S4 cluster.
+ private EventDispatcher dispatcher;
+ private Writer eventWriter = new Writer();
+
+ // Listen to events from S4 cluster and send to clients.
+ private org.apache.s4.collector.EventListener clusterEventListener;
+ private Reader eventReader = new Reader();
+
+ /**
+ * Set the dispatcher to use for sending events into S4.
+ *
+ * @param dispatcher
+ */
+ public void setDispatcher(EventDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ /**
+ * clusterEventListener receives events from S4 cluster. Processing of
+ * events is delegated to the eventReader, which typically forwards them to
+ * appropriate clients.
+ *
+ * @param clusterEventListener
+ */
+ public void setClusterEventListener(EventListener clusterEventListener) {
+ this.clusterEventListener = clusterEventListener;
+ this.clusterEventListener.setEventProcessor(eventReader);
+ }
+
+ public Adapter() {
+ }
+
+ int counts[];
+
+ private boolean init = false;
+
+ public void init() {
+ synchronized (this) {
+ init = true;
+ }
+ }
+
+ /**
+ * Register a list of InputStubs that will send events into the S4 cluster.
+ * These events will be processed by the eventWriter: i.e. the eventWriter
+ * dispatches them into the S4 cluster.
+ *
+ * @param stubs
+ */
+ public void setInputStubs(List<InputStub> stubs) {
+ for (InputStub stub : stubs) {
+ // register the writer as the handler for the stub's events
+ stub.addHandler(eventWriter);
+ }
+ }
+
+ // collections of eventReceivers (OutputStubs) to which events will be sent
+ // for forwarding to clients.
+ HashMap<String, List<OutputStub>> eventReceivers = new HashMap<String, List<OutputStub>>();
+ List<OutputStub> eventReceiversAny = new ArrayList<OutputStub>();
+ List<OutputStub> eventReceiversAll = new ArrayList<OutputStub>();
+
+ /**
+ * Register a list of OutputStubs which process events received from the S4
+ * cluster, typically forwarding them to clients.
+ *
+ * The {@link OutputStub#getAcceptedStreams()} is called to determine which
+ * streams the stub is interested in receiving. Accordingly, three
+ * collections of stubs are created.
+ *
+ * <ol>
+ * <li>A mapping from stream name to OutputStubs (One-to-many). Used for
+ * routing.</li>
+ * <li>A list of OutputStubs that accept all streams (
+ * {@code stub.getAcceptedStreams() == null}. Used for routing.</li>
+ * <li>A list of all OutputStubs. This is used to iterate over all the stubs
+ * exactly once.</li>
+ * </ol>
+ *
+ * @param stubs
+ * the list of output stubs.
+ */
+ public void setOutputStubs(List<OutputStub> stubs) {
+ eventReceiversAll.addAll(stubs);
+
+ for (OutputStub stub : stubs) {
+ // update mapping of stream names to stubs that accept events on
+ // that stream.
+ List<String> streams = stub.getAcceptedStreams();
+ if (streams != null) {
+ for (String stream : streams) {
+ List<OutputStub> stubList = eventReceivers.get(stream);
+ if (stubList == null) {
+ stubList = new ArrayList<OutputStub>();
+ eventReceivers.put(stream, stubList);
+ }
+
+ stubList.add(stub);
+ }
+ } else {
+ eventReceiversAny.add(stub);
+ }
+ }
+ }
+
+ /**
+ * Write events from input stubs into S4 cluster.
+ */
+ private class Writer implements EventHandler {
+ // events to be dispatched into cluster
+ public void processEvent(EventWrapper eventWrapper) {
+ try {
+ synchronized (this) {
+ if (!init) {
+ return;
+ }
+ rawEventCount++;
+ eventCount++;
+ }
+
+ // null keys => round-robin
+ // empty key list => default partitioning of underlying
+ // partitioner.
+ List<List<String>> keys = eventWrapper.getCompoundKeyNames();
+
+ String stream = eventWrapper.getStreamName();
+
+ Object event = eventWrapper.getEvent();
+
+ if (event instanceof Request)
+ decorateRequest((Request) event);
+
+ dispatcher.dispatchEvent(stream, keys, event);
+
+ } catch (Exception e) {
+ Logger.getLogger("adapter").info("Exception adapting event", e);
+ }
+ }
+
+ private volatile int eventCount = 0;
+ private volatile int rawEventCount = 0;
+
+ // set the return stream name to the adapter cluster name
+ private void decorateRequest(Request r) {
+ Request.RInfo rinfo = r.getRInfo();
+
+ if (rinfo instanceof Request.ClientRInfo) {
+ String cname = clusterEventListener.getRawListener()
+ .getAppName();
+
+ ((Request.ClientRInfo) rinfo).setStream("@" + cname);
+ }
+ }
+ }
+
+ /**
+ * Read events from S4 cluster.
+ */
+ private class Reader implements AsynchronousEventProcessor {
+ /**
+ * Queue work for processing by OutputStubs. This method simply queues
+ * the events in the appropriate stubs. An event from stream K is queued
+ * in OutputStub S if either S accepts all streams, or K is contained in
+ * the list {@code S.getAcceptedStreams()}.
+ */
+ @Override
+ public void queueWork(EventWrapper eventWrapper) {
+ List<OutputStub> stubs = eventReceivers.get(eventWrapper.getStreamName());
+
+ // stubs that accept any stream
+ for (OutputStub stub : eventReceiversAny)
+ stub.queueWork(eventWrapper);
+
+ // stubs that receive this stream in particular
+ if (stubs != null)
+ for (OutputStub stub : stubs)
+ stub.queueWork(eventWrapper);
+ }
+
+ @Override
+ public int getQueueSize() {
+ int sz = 0;
+
+ for (OutputStub stub : eventReceiversAll)
+ sz += stub.getQueueSize();
+
+ return sz;
+ }
+
+ }
+
+ private static class TestDispatcher implements EventDispatcher {
+ @Override
+ public void dispatchEvent(String s, Object e) {
+ System.out.println("Dispatching event: " + s + ":" + e);
+ }
+
+ @Override
+ public void dispatchEvent(String s, List<List<String>> k, Object e) {
+ System.out.println("Dispatching event: " + s + ":" + k + ":" + e);
+ }
+ }
+
+ private static class TestReturnType {
+ private int ra;
+ private int rb;
+
+ @SuppressWarnings("unused")
+ TestReturnType() {
+ }
+
+ TestReturnType(int a, int b) {
+ this.ra = a;
+ this.rb = b;
+ }
+
+ public String toString() {
+ return "ra=" + ra + " rb=" + rb;
+ }
+ }
+
+ @SuppressWarnings("static-access")
+ public static void main(String args[]) throws IOException,
+ InterruptedException {
+
+ Options options = new Options();
+
+ options.addOption(OptionBuilder.withArgName("corehome")
+ .hasArg()
+ .withDescription("core home")
+ .create("c"));
+
+ options.addOption(OptionBuilder.withArgName("instanceid")
+ .hasArg()
+ .withDescription("instance id")
+ .create("i"));
+
+ options.addOption(OptionBuilder.withArgName("configtype")
+ .hasArg()
+ .withDescription("configuration type")
+ .create("t"));
+
+ options.addOption(OptionBuilder.withArgName("userconfig")
+ .hasArg()
+ .withDescription("user-defined legacy data adapter configuration file")
+ .create("d"));
+
+ CommandLineParser parser = new GnuParser();
+ CommandLine commandLine = null;
+
+ try {
+ commandLine = parser.parse(options, args);
+ } catch (ParseException pe) {
+ System.err.println(pe.getLocalizedMessage());
+ System.exit(1);
+ }
+
+ int instanceId = -1;
+ if (commandLine.hasOption("i")) {
+ String instanceIdStr = commandLine.getOptionValue("i");
+ try {
+ instanceId = Integer.parseInt(instanceIdStr);
+ } catch (NumberFormatException nfe) {
+ System.err.println("Bad instance id: %s" + instanceIdStr);
+ System.exit(1);
+ }
+ }
+
+ if (commandLine.hasOption("c")) {
+ coreHome = commandLine.getOptionValue("c");
+ }
+
+ String configType = "typical";
+ if (commandLine.hasOption("t")) {
+ configType = commandLine.getOptionValue("t");
+ }
+
+ String userConfigFilename = null;
+ if (commandLine.hasOption("d")) {
+ userConfigFilename = commandLine.getOptionValue("d");
+ }
+
+ File userConfigFile = new File(userConfigFilename);
+ if (!userConfigFile.isFile()) {
+ System.err.println("Bad user configuration file: "
+ + userConfigFilename);
+ System.exit(1);
+ }
+
+ File coreHomeFile = new File(coreHome);
+ if (!coreHomeFile.isDirectory()) {
+ System.err.println("Bad core home: " + coreHome);
+ System.exit(1);
+ }
+
+ if (instanceId > -1) {
+ System.setProperty("instanceId", "" + instanceId);
+ } else {
+ System.setProperty("instanceId", "" + S4Util.getPID());
+ }
+
+ String configBase = coreHome + File.separatorChar + "conf"
+ + File.separatorChar + configType;
+ String configPath = configBase + File.separatorChar
+ + "client-adapter-conf.xml";
+ File configFile = new File(configPath);
+ if (!configFile.exists()) {
+ System.err.printf("adapter config file %s does not exist\n",
+ configPath);
+ System.exit(1);
+ }
+
+ // load adapter config xml
+ ApplicationContext coreContext;
+ coreContext = new FileSystemXmlApplicationContext("file:" + configPath);
+ ApplicationContext context = coreContext;
+
+ Adapter adapter = (Adapter) context.getBean("client_adapter");
+
+ ApplicationContext appContext = new FileSystemXmlApplicationContext(new String[] { "file:"
+ + userConfigFilename },
+ context);
+
+ Map<?, ?> inputStubBeanMap = appContext.getBeansOfType(InputStub.class);
+ Map<?, ?> outputStubBeanMap = appContext.getBeansOfType(OutputStub.class);
+
+ if (inputStubBeanMap.size() == 0 && outputStubBeanMap.size() == 0) {
+ System.err.println("No user-defined input/output stub beans");
+ System.exit(1);
+ }
+
+ ArrayList<InputStub> inputStubs = new ArrayList<InputStub>(inputStubBeanMap.size());
+ ArrayList<OutputStub> outputStubs = new ArrayList<OutputStub>(outputStubBeanMap.size());
+
+ // add all input stubs
+ for (Map.Entry<?, ?> e : inputStubBeanMap.entrySet()) {
+ String beanName = (String) e.getKey();
+ System.out.println("Adding InputStub " + beanName);
+ inputStubs.add((InputStub) e.getValue());
+ }
+
+ // add all output stubs
+ for (Map.Entry<?, ?> e : outputStubBeanMap.entrySet()) {
+ String beanName = (String) e.getKey();
+ System.out.println("Adding OutputStub " + beanName);
+ outputStubs.add((OutputStub) e.getValue());
+ }
+
+ adapter.setInputStubs(inputStubs);
+ adapter.setOutputStubs(outputStubs);
+
+ }
+
+ public static void clientTest() throws IOException, InterruptedException {
+ BasicConfigurator.configure();
+
+ TestDispatcher disp = new TestDispatcher();
+
+ Adapter adapter = new Adapter();
+ adapter.setDispatcher(disp);
+
+ GenericJsonClientStub stub = new GenericJsonClientStub();
+ stub.setConnectionPort(2334);
+
+ InputStub[] in = { stub };
+ OutputStub[] out = { stub };
+ adapter.setInputStubs(Arrays.asList(in));
+ adapter.setOutputStubs(Arrays.asList(out));
+
+ adapter.init();
+ stub.init();
+
+ while (true) {
+ Thread.sleep(10000);
+ TestReturnType r = new TestReturnType(100, 200);
+ adapter.eventReader.queueWork(new EventWrapper("TESTSTREAM",
+ r,
+ null));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/ClientConnection.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/ClientConnection.java b/s4-core/src/main/java/org/apache/s4/client/ClientConnection.java
new file mode 100644
index 0000000..388094a
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/ClientConnection.java
@@ -0,0 +1,163 @@
+package org.apache.s4.client;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.message.Request;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.HashSet;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Connection to a client. A Stub has a collection of connections.
+ */
+public class ClientConnection {
+ /**
+ *
+ */
+ private final ClientStub clientStub;
+
+ /**
+ * TCP/IP socket used to communicate with client.
+ */
+ private final Socket socket;
+
+ public final IOChannel io;
+
+ public final ClientReadMode clientReadMode;
+ public final ClientWriteMode clientWriteMode;
+
+ /**
+ * GUID of client.
+ */
+ public final UUID uuid;
+
+ public ClientConnection(ClientStub clientStub, Socket socket, UUID uuid,
+ ClientReadMode clientReadMode, ClientWriteMode clientWriteMode)
+ throws IOException {
+ this.clientStub = clientStub;
+ this.uuid = uuid;
+ this.socket = socket;
+ this.io = this.clientStub.createIOChannel(socket);
+ this.clientReadMode = clientReadMode;
+ this.clientWriteMode = clientWriteMode;
+ }
+
+ public boolean good() {
+ return socket.isConnected();
+ }
+
+ public void close() {
+ synchronized (this.clientStub.clients) {
+ ClientStub.logger.info("closing connection to client " + uuid);
+ this.clientStub.clients.remove(this.uuid);
+ }
+
+ try {
+ socket.close();
+ } catch (IOException e) {
+ ClientStub.logger.error("problem closing client connection to client "
+ + uuid,
+ e);
+ }
+ }
+
+ private HashSet<String> includeStreams = new HashSet<String>();
+ private HashSet<String> excludeStreams = new HashSet<String>();
+
+ public void includeStreams(List<String> streams) {
+ includeStreams.addAll(streams);
+ }
+
+ public void excludeStreams(List<String> streams) {
+ excludeStreams.addAll(streams);
+ }
+
+ /**
+ * Stream is accepted if and only if:
+ *
+ * (A) readMode is Select and: 1. indifferent to stream inclusion OR stream
+ * is included AND 2. indifferent to stream exclusion OR stream is not
+ * excluded.
+ *
+ * OR (B) readMode is All
+ *
+ * @return true if and only if stream is accepted
+ */
+ public boolean streamAccepted(String s) {
+ switch (clientReadMode) {
+ case None:
+ case Private:
+ return false;
+
+ case Select:
+ return (includeStreams.isEmpty() || includeStreams.contains(s))
+ && (excludeStreams.isEmpty() || !excludeStreams.contains(s));
+
+ case All:
+ return true;
+
+ default:
+ return false;
+ }
+ }
+
+ private Thread receiverThread = null;
+
+ public void start() {
+ if (clientWriteMode == ClientWriteMode.Enabled)
+ (receiverThread = new Thread(receiver)).start();
+ }
+
+ public void stop() {
+ if (receiverThread != null) {
+ receiverThread.interrupt();
+ receiverThread = null;
+ }
+ }
+
+ public final Runnable receiver = new Runnable() {
+ public void run() {
+ try {
+ while (good()) {
+ byte[] b = io.recv();
+
+ // null, empty => goodbye
+ if (b == null || b.length == 0) {
+ ClientStub.logger.info("client session ended " + uuid);
+ break;
+ }
+
+ EventWrapper ew = ClientConnection.this.clientStub.eventWrapperFromBytes(b);
+ if (ew == null)
+ continue;
+
+ Object event = ew.getEvent();
+ if (event instanceof Request) {
+ decorateRequest((Request) event);
+ ClientStub.logger.info("Decorated client request: "
+ + ew.toString());
+ }
+
+ ClientConnection.this.clientStub.injectEvent(ew);
+ }
+ } catch (IOException e) {
+ ClientStub.logger.info("error while reading from client "
+ + uuid, e);
+
+ } finally {
+ close();
+ }
+ }
+
+ private void decorateRequest(Request r) {
+ // add UUID of client into request.
+ Request.RInfo info = r.getRInfo();
+
+ if (info != null && info instanceof Request.ClientRInfo)
+ ((Request.ClientRInfo) info).setRequesterUUID(ClientConnection.this.uuid);
+ }
+ };
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/ClientReadMode.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/ClientReadMode.java b/s4-core/src/main/java/org/apache/s4/client/ClientReadMode.java
new file mode 100644
index 0000000..03df61a
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/ClientReadMode.java
@@ -0,0 +1,37 @@
+package org.apache.s4.client;
+
+/**
+ * Client mode.
+ */
+public enum ClientReadMode {
+ None(false, false), Private(true, false), Select(true, true), All(true, true);
+
+ private final boolean priv;
+ private final boolean pub;
+
+ ClientReadMode(boolean priv, boolean pub) {
+ this.priv = priv;
+ this.pub = pub;
+ }
+
+ public boolean takePublic() {
+ return pub;
+ }
+
+ public boolean takePrivate() {
+ return priv;
+ }
+
+ public static ClientReadMode fromString(String s) {
+ if (s.equalsIgnoreCase("none"))
+ return None;
+ else if (s.equalsIgnoreCase("private"))
+ return Private;
+ else if (s.equalsIgnoreCase("select"))
+ return Select;
+ else if (s.equalsIgnoreCase("all"))
+ return All;
+ else
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/ClientStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/ClientStub.java b/s4-core/src/main/java/org/apache/s4/client/ClientStub.java
new file mode 100644
index 0000000..f1acd44
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/ClientStub.java
@@ -0,0 +1,312 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.client;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.listener.EventHandler;
+import org.apache.s4.message.Request;
+import org.apache.s4.message.Response;
+import org.apache.s4.util.ByteArrayIOChannel;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+public abstract class ClientStub implements OutputStub, InputStub {
+
+ protected static final Logger logger = Logger.getLogger("adapter");
+
+ /**
+ * Description of the protocol implemented by a concrete instance of this
+ * stub.
+ */
+ public static class Info {
+ public final String name;
+ public final int versionMajor;
+ public final int versionMinor;
+
+ public Info(String name, int versionMajor, int versionMinor) {
+ this.name = name;
+ this.versionMajor = versionMajor;
+ this.versionMinor = versionMinor;
+ }
+ }
+
+ /**
+ * Meta-information about the protocol that this stub uses to talk to
+ * external clients.
+ *
+ * This is sent to the client as a part of the handshake.
+ */
+ abstract public Info getProtocolInfo();
+
+ /**
+ * Stream names that are accepted by this stub to be forwarded to its
+ * clients.
+ */
+ @Override
+ public List<String> getAcceptedStreams() {
+ return null;
+ }
+
+ private List<EventHandler> handlers = new ArrayList<EventHandler>();
+
+ /**
+ * A handler that can inject events produced by this stub into the S4
+ * cluster.
+ */
+ @Override
+ public void addHandler(EventHandler handler) {
+ this.handlers.add(handler);
+ }
+
+ /**
+ * Remove a handler.
+ */
+ @Override
+ public boolean removeHandler(EventHandler handler) {
+ return handlers.remove(handler);
+ }
+
+ /**
+ * Convert an array of bytes into an event wrapper. This method is used to
+ * translate data received from a client into events that may be injected
+ * into the S4 cluster.
+ *
+ * @param v
+ * array of bytes
+ * @return EventWrapper constructed from the byte array.
+ */
+ abstract public EventWrapper eventWrapperFromBytes(byte[] v);
+
+ /**
+ * Convert an event wrapper into a byte array. Events received from the S4
+ * cluster for dispatching to a client are translated into a byte array
+ * using this method.
+ *
+ * @param e
+ * an {@link EventWrapper}
+ * @return a byte array
+ */
+ abstract public byte[] bytesFromEventWrapper(EventWrapper e);
+
+ /**
+ * Construct an I/O channel over which the stub can communicate with a
+ * client. The channel allows arrys of bytes to be exchanged between the
+ * stub and client.
+ *
+ * @param socket
+ * TCP/IP socket
+ * @return an IO Channel to send and recv byte arrays
+ * @throws IOException
+ * if the underlying socket could not provide valid input and
+ * output streams.
+ */
+ public IOChannel createIOChannel(Socket socket) throws IOException {
+ return new ByteArrayIOChannel(socket);
+ }
+
+ // send an event into the cluster via adapter.
+ void injectEvent(EventWrapper e) {
+ for (EventHandler handler : handlers) {
+ handler.processEvent(e);
+ }
+ }
+
+ // private List<ClientConnection> clients = new
+ // ArrayList<ClientConnection>();
+ HashMap<UUID, ClientConnection> clients = new HashMap<UUID, ClientConnection>();
+
+ /**
+ * Create a client connection and add it to list of clients.
+ *
+ * @param socket
+ * client's I/O socket
+ */
+ private void addClient(ClientConnection c) {
+ synchronized (clients) {
+ logger.info("adding client " + c.uuid);
+ clients.put(c.uuid, c);
+ }
+ }
+
+ LinkedBlockingQueue<EventWrapper> queue = new LinkedBlockingQueue<EventWrapper>();
+
+ @Override
+ public int getQueueSize() {
+ return queue.size();
+ }
+
+ @Override
+ public void queueWork(EventWrapper e) {
+ queue.offer(e);
+ }
+
+ ServerSocket serverSocket = null;
+
+ public void setConnectionPort(int port) throws IOException {
+ serverSocket = new ServerSocket(port);
+ }
+
+ private Thread acceptThread = null;
+ private Thread senderThread = null;
+
+ public void init() {
+ // start accepting new clients and sending events to them
+ (acceptThread = new Thread(connectionListener)).start();
+ (senderThread = new Thread(sender)).start();
+ }
+
+ public void shutdown() {
+ // stop accepting new clients
+ if (acceptThread != null) {
+ acceptThread.interrupt();
+ acceptThread = null;
+ }
+
+ // stop sending events to them.
+ if (senderThread != null) {
+ senderThread.interrupt();
+ senderThread = null;
+ }
+
+ // stop all connected clients.
+ List<ClientConnection> clientCopy = new ArrayList<ClientConnection>(clients.values());
+ for (ClientConnection c : clientCopy) {
+ c.stop();
+ c.close();
+ }
+ }
+
+ private final Runnable connectionListener = new Runnable() {
+
+ Handshake handshake = null;
+
+ public void run() {
+ if (handshake == null)
+ handshake = new Handshake(ClientStub.this);
+
+ try {
+ while (serverSocket != null && serverSocket.isBound()
+ && !Thread.currentThread().isInterrupted()) {
+
+ Socket socket = serverSocket.accept();
+
+ ClientConnection connection = handshake.execute(socket);
+
+ if (connection != null) {
+ addClient(connection);
+ connection.start();
+ }
+
+ }
+ } catch (IOException e) {
+ logger.info("exception in client connection listener", e);
+ }
+ }
+
+ };
+
+ public final Runnable sender = new Runnable() {
+ ArrayList<ClientConnection> disconnect = new ArrayList<ClientConnection>();
+
+ public void run() {
+
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ EventWrapper event = queue.take();
+
+ // Responses need special handling.
+ if (event.getEvent() instanceof Response) {
+ dispatchResponse(event);
+ continue;
+ }
+
+ // TODO: include check to see if the event belongs to a
+ // particular client.
+
+ dispatchToAllClients(event);
+
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+
+ private void dispatchToAllClients(EventWrapper event) {
+
+ byte[] b = bytesFromEventWrapper(event);
+ String stream = event.getStreamName();
+
+ synchronized (clients) {
+ for (ClientConnection c : clients.values()) {
+ if (c.good() && c.streamAccepted(stream)) {
+ try {
+ c.io.send(b);
+
+ } catch (IOException e) {
+ logger.error("error sending message to client "
+ + c.uuid + ". disconnecting", e);
+
+ disconnect.add(c);
+ }
+ }
+ }
+ }
+
+ if (disconnect.size() > 0) {
+ for (ClientConnection d : disconnect)
+ d.close();
+
+ disconnect.clear();
+ }
+ }
+
+ private void dispatchResponse(EventWrapper event) {
+ Response res = (Response) event.getEvent();
+ Request.RInfo rinfo = res.getRInfo();
+
+ if (rinfo instanceof Request.ClientRInfo) {
+ UUID uuid = ((Request.ClientRInfo) rinfo).getRequesterUUID();
+
+ ClientConnection c = clients.get(uuid);
+
+ if (c != null && c.good() && c.clientReadMode.takePrivate()) {
+ try {
+ byte[] b = bytesFromEventWrapper(event);
+ if (b != null) c.io.send(b);
+
+ } catch (IOException e) {
+ logger.error("error sending response to client "
+ + c.uuid + ". disconnecting", e);
+
+ c.close();
+ }
+
+ } else {
+ logger.warn("no active client found for response: " + res);
+ }
+ }
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/ClientWriteMode.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/ClientWriteMode.java b/s4-core/src/main/java/org/apache/s4/client/ClientWriteMode.java
new file mode 100644
index 0000000..e856ea7
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/ClientWriteMode.java
@@ -0,0 +1,17 @@
+package org.apache.s4.client;
+
+/**
+ * Client's write mode.
+ */
+public enum ClientWriteMode {
+ Enabled, Disabled;
+
+ public static ClientWriteMode fromString(String s) {
+ if (s.equalsIgnoreCase("enabled"))
+ return Enabled;
+ else if (s.equalsIgnoreCase("disabled"))
+ return Disabled;
+ else
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/GenericJsonClientStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/GenericJsonClientStub.java b/s4-core/src/main/java/org/apache/s4/client/GenericJsonClientStub.java
new file mode 100644
index 0000000..6268407
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/GenericJsonClientStub.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.client;
+
+import org.apache.s4.client.util.ObjectBuilder;
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.util.GsonUtil;
+
+import java.nio.charset.Charset;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class GenericJsonClientStub extends ClientStub {
+
+ // private static final ObjectBuilder builder = new ObjectBuilder();
+ // private static final Gson builder = new Gson();
+
+ private static final Info protocolInfo = new Info("generic-json", 1, 0);
+
+ @Override
+ public Info getProtocolInfo() {
+ return protocolInfo;
+ }
+
+ @Override
+ public EventWrapper eventWrapperFromBytes(byte[] v) {
+ try {
+ // interpret v as a JSON string
+ String s = new String(v, Charset.forName("UTF8"));
+ JSONObject json = new JSONObject(s);
+
+ String streamName = json.getString("stream");
+ String className = json.getString("class");
+
+ Class<?> clazz;
+ try {
+ clazz = Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new ObjectBuilder.Exception("bad class name for json-encoded object: "
+ + className,
+ e);
+ }
+
+ String[] keyNames = null;
+ JSONArray keyArray = json.optJSONArray("keys");
+ if (keyArray != null) {
+ keyNames = new String[keyArray.length()];
+ for (int i = 0; i < keyNames.length; ++i) {
+ keyNames[i] = keyArray.optString(i);
+ }
+ }
+
+ String jevent = json.getString("object");
+
+ Object obj = GsonUtil.get().fromJson(jevent, clazz);
+
+ return new EventWrapper(streamName, keyNames, obj);
+
+ } catch (JSONException e) {
+ logger.error("problem with event JSON", e);
+ } catch (ObjectBuilder.Exception e) {
+ logger.error("failed to build object from JSON", e);
+ }
+
+ return null;
+ }
+
+ @Override
+ public byte[] bytesFromEventWrapper(EventWrapper ew) {
+ JSONObject jevent = new JSONObject();
+
+ Object obj = ew.getEvent();
+
+ try {
+ jevent.put("stream", ew.getStreamName());
+ jevent.put("class", obj.getClass().getName());
+ jevent.put("object", GsonUtil.get().toJson(obj));
+
+ return jevent.toString().getBytes(Charset.forName("UTF8"));
+
+ } catch (JSONException e) {
+ logger.error("exception while converting event wrapper to bytes.",
+ e);
+ return null;
+ } catch (IllegalArgumentException iae) {
+ logger.error("exception while converting event wrapper to bytes.",
+ iae);
+ logger.error(obj);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/Handshake.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/Handshake.java b/s4-core/src/main/java/org/apache/s4/client/Handshake.java
new file mode 100644
index 0000000..e149006
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/Handshake.java
@@ -0,0 +1,223 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.client;
+
+import org.apache.s4.client.ClientStub.Info;
+import org.apache.s4.util.ByteArrayIOChannel;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.gson.Gson;
+
+public class Handshake {
+ private final ClientStub clientStub;
+
+ protected static final Logger logger = Logger.getLogger("adapter");
+
+ protected final Gson gson = new Gson();
+
+ public Handshake(ClientStub clientStub) {
+ this.clientStub = clientStub;
+ }
+
+ // execute handshake with client.
+ // 1. discovery: issue uuid;
+ // 2. main connect: create connection
+ public ClientConnection execute(Socket s) {
+ byte[] v;
+
+ try {
+ ClientConnection conn = null;
+
+ ByteArrayIOChannel io = new ByteArrayIOChannel(s);
+
+ v = io.recv();
+
+ if (v == null || v.length == 0) {
+ // no information => client initialization
+ clientInit(io);
+
+ } else {
+ // some data => client connect
+ conn = clientConnect(v, io, s);
+ }
+
+ if (conn == null)
+ s.close();
+
+ return conn;
+
+ } catch (IOException e) {
+ logger.error("exception during handshake", e);
+ try {
+ s.close();
+ return null;
+
+ } catch (IOException ee) {
+ throw new RuntimeException("failed to close socket after failed handshake",
+ ee);
+ }
+ }
+ }
+
+ private ClientConnection clientConnect(byte[] v, ByteArrayIOChannel io,
+ Socket sock) throws IOException {
+
+ List<String> reason = new ArrayList<String>(1);
+ ClientConnection conn = clientConnectCreate(v, io, sock, reason);
+
+ String message = null;
+ try {
+ JSONObject resp = new JSONObject();
+
+ resp.put("status", (conn != null ? "ok" : "failed"));
+
+ if (conn == null && !reason.isEmpty()) {
+ resp.put("reason", reason.get(0));
+ }
+
+ message = resp.toString();
+
+ } catch (JSONException e) {
+ logger.error("error creating response during connect.", e);
+ return null;
+ }
+
+ io.send(message.getBytes());
+
+ return conn;
+ }
+
+ private ClientConnection clientConnectCreate(byte[] v,
+ ByteArrayIOChannel io,
+ Socket sock,
+ List<String> reason)
+ throws IOException {
+
+ try {
+ JSONObject cInfo = new JSONObject(new String(v));
+
+ String s = cInfo.optString("uuid", "");
+ if (s.isEmpty()) {
+ logger.error("missing client identifier during handshake.");
+ reason.add("missing UUID");
+ return null;
+ }
+
+ UUID u = UUID.fromString(s);
+
+ logger.info("connecting to client " + u);
+
+ s = cInfo.optString("readMode", "Private");
+ ClientReadMode rmode = ClientReadMode.fromString(s);
+ if (rmode == null) {
+ logger.error(u + ": unknown readMode " + s);
+ reason.add("unknown readMode " + s);
+ return null;
+
+ }
+
+ s = cInfo.optString("writeMode", "Enabled");
+ ClientWriteMode wmode = ClientWriteMode.fromString(s);
+ if (wmode == null) {
+ logger.error(u + ": unknown writeMode " + s);
+ reason.add("unknown writeMode " + s);
+ return null;
+ }
+
+ logger.info(u + " read=" + rmode + " write=" + wmode);
+
+ if (rmode == ClientReadMode.None
+ && wmode == ClientWriteMode.Disabled) {
+ // client cannot disable read AND write...
+ logger.error("client neither reads nor writes.");
+ reason.add("read and write disabled");
+
+ return null;
+ }
+
+ ClientConnection conn = new ClientConnection(clientStub, sock, u, rmode, wmode);
+
+ if (rmode == ClientReadMode.Select) {
+ JSONArray incl = cInfo.optJSONArray("readInclude");
+ JSONArray excl = cInfo.optJSONArray("readExclude");
+
+ if (incl == null && excl == null) {
+ logger.error(u + ": missing stream selection information");
+ reason.add("missing readInclude and readExclude");
+ return null;
+ }
+
+ if (incl != null) {
+ List<String> streams = new ArrayList<String>(incl.length());
+ for (int i = 0; i < incl.length(); ++i)
+ streams.add(incl.getString(i));
+
+ conn.includeStreams(streams);
+ }
+
+ if (excl != null) {
+ List<String> streams = new ArrayList<String>(excl.length());
+ for (int i = 0; i < excl.length(); ++i)
+ streams.add(excl.getString(i));
+
+ conn.excludeStreams(streams);
+ }
+
+ }
+
+ return conn;
+
+ } catch (JSONException e) {
+ logger.error("malformed JSON from client during handshake", e);
+ reason.add("malformed JSON");
+
+ } catch (NumberFormatException e) {
+ logger.error("received malformed UUID", e);
+ reason.add("malformed UUID");
+
+ } catch (IllegalArgumentException e) {
+ logger.error("received malformed UUID", e);
+ reason.add("malformed UUID");
+ }
+
+ return null;
+ }
+
+ private void clientInit(ByteArrayIOChannel io) throws IOException {
+ String uuid = UUID.randomUUID().toString();
+ Info proto = clientStub.getProtocolInfo();
+
+ HashMap<String, Object> info = new HashMap<String, Object>();
+ info.put("uuid", uuid);
+ info.put("protocol", proto);
+
+ String response = gson.toJson(info) + "\n";
+
+ io.send(response.getBytes());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/IOChannel.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/IOChannel.java b/s4-core/src/main/java/org/apache/s4/client/IOChannel.java
new file mode 100644
index 0000000..e5c0414
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/IOChannel.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.client;
+
+import java.io.IOException;
+
+public interface IOChannel {
+ byte[] recv() throws IOException;
+
+ void send(byte[] v) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/InputStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/InputStub.java b/s4-core/src/main/java/org/apache/s4/client/InputStub.java
new file mode 100644
index 0000000..63ca95f
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/InputStub.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.client;
+
+import org.apache.s4.listener.EventProducer;
+
+public interface InputStub extends EventProducer {
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/OutputStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/OutputStub.java b/s4-core/src/main/java/org/apache/s4/client/OutputStub.java
new file mode 100644
index 0000000..59709d8
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/OutputStub.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.client;
+
+import org.apache.s4.processor.AsynchronousEventProcessor;
+
+import java.util.List;
+
+public interface OutputStub extends AsynchronousEventProcessor {
+
+ List<String> getAcceptedStreams();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java b/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java
new file mode 100644
index 0000000..bfb6eeb
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.client.util;
+
+import org.apache.s4.message.Request;
+import org.apache.s4.message.SinglePERequest;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.InstanceCreator;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import flexjson.JSONDeserializer;
+import flexjson.JSONSerializer;
+
+public class ObjectBuilder {
+
+ private ConcurrentHashMap<Class<?>, JSONDeserializer<Object>> deserializers = new ConcurrentHashMap<Class<?>, JSONDeserializer<Object>>();
+
+ private JSONSerializer serializer = (new JSONSerializer()).exclude("class");
+
+ // private JSONSerializer serializer = (new JSONSerializer());
+
+ public Object fromJson(String jevent, Class<?> clazz)
+ throws ObjectBuilder.Exception {
+
+ JSONDeserializer<Object> deser = deserializers.get(clazz);
+
+ if (deser == null) {
+ JSONDeserializer<Object> newDeser = new JSONDeserializer<Object>();
+ newDeser.use(null, clazz);
+
+ deser = deserializers.putIfAbsent(clazz, newDeser);
+
+ if (deser == null)
+ deser = newDeser;
+ }
+
+ return deser.deserialize(jevent);
+
+ }
+
+ public String toJson(Object e) {
+ return serializer.serialize(e);
+ }
+
+ public static class Exception extends java.lang.Exception {
+ public Exception(String message) {
+ super(message);
+ }
+
+ public Exception(String message, Throwable cause) {
+ super(message, cause);
+ }
+ }
+
+ private static class TEST {
+ private int a;
+ private int b;
+
+ public void setA(int a) {
+ this.a = a * 10;
+ }
+
+ public String toString() {
+ return "" + a + " " + b;
+ }
+
+ public int getA() {
+ return a;
+ }
+
+ public int getB() {
+ return b;
+ }
+
+ public TEST(int a, int b) {
+ this.a = a * 10;
+ this.b = b * 10;
+ }
+
+ public TEST() {
+ }
+ }
+
+ public static void main(String[] argv) throws Exception {
+
+ ObjectBuilder b = new ObjectBuilder();
+
+ String s = "{a:5, b:100}";
+ Object out = b.fromJson(s, TEST.class);
+
+ System.out.println(out.toString());
+
+ TEST t = new TEST(1, 2);
+
+ System.out.println(b.toJson(t));
+
+ String[] query = { "name", "count", "freq" };
+ String target[] = { "ACDW", "11" };
+
+ org.apache.s4.message.Request.ClientRInfo rinfo = new io.s4.message.Request.ClientRInfo();
+ rinfo.setRequesterUUID(UUID.randomUUID());
+ Request req = new org.apache.s4.message.SinglePERequest(Arrays.asList(target),
+ Arrays.asList(query),
+ rinfo);
+
+ System.out.println(req.toString());
+
+ InstanceCreator<org.apache.s4.message.Request.RInfo> infoCreator = new InstanceCreator<io.s4.message.Request.RInfo>() {
+ public org.apache.s4.message.Request.RInfo createInstance(Type type) {
+ return new org.apache.s4.message.Request.ClientRInfo();
+ }
+ };
+
+ Gson gson = (new GsonBuilder()).registerTypeAdapter(org.apache.s4.message.Request.RInfo.class,
+ infoCreator)
+ .registerTypeAdapter(Object.class,
+ new ObjectTypeAdapter())
+ .create();
+
+ System.out.println("gson: " + gson.toJson(req));
+ System.out.println("gson reversed: "
+ + gson.fromJson(gson.toJson(req), SinglePERequest.class));
+
+ System.out.println(b.toJson(req));
+ System.out.println(b.toJson(Arrays.asList(query)));
+
+ System.out.println("----------------------------------------------");
+
+ ArrayList<SSTest> list = new ArrayList<SSTest>();
+
+ SSTest ss1 = new SSTest();
+ ss1.str = "list-element-1";
+ SSTest ss2 = new SSTest();
+ ss2.str = "list-element-2";
+
+ list.add(ss1);
+ list.add(ss2);
+
+ Map<String, Object> listmap = new HashMap<String, Object>();
+ listmap.put("ll", list);
+
+ MapTest mt = new MapTest();
+ mt.map = listmap;
+
+ Object listmapobj = listmap;
+
+ System.out.println("list: " + gson.toJson(list));
+ System.out.println("listmap: " + gson.toJson(listmap));
+ System.out.println("listmapobj: " + gson.toJson(listmapobj));
+ System.out.println("mapobject: " + gson.toJson(mt));
+ }
+
+ private static class SSTest {
+ public String str;
+ }
+
+ private static class MapTest {
+ Map<String, Object> map;
+ Map gmap;
+ }
+
+ private static class ObjectTypeAdapter implements JsonSerializer<Object> {
+ public JsonElement serialize(Object src, Type typeOfSrc,
+ JsonSerializationContext context) {
+
+ if (src.getClass() != Object.class) {
+ return context.serialize(src, src.getClass());
+ }
+
+ return new JsonObject();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/collector/Event.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/collector/Event.java b/s4-core/src/main/java/org/apache/s4/collector/Event.java
new file mode 100644
index 0000000..404c724
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/collector/Event.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.collector;
+
+import org.apache.s4.dispatcher.Dispatcher;
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.dispatcher.partitioner.KeyInfo;
+import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElement;
+import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementIndex;
+import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class Event extends EventRecord {
+ private String eventName;
+ private long timestamp;
+ private List<CompoundKeyInfo> compoundKeys = new ArrayList<CompoundKeyInfo>();
+ private boolean debug = false;
+ public static final String EVENT_NAME_KEY = "S4__eventName";
+ public static final String TIMESTAMP_KEY = "S4__timestamp";
+
+ public void setDebug(boolean debug) {
+ this.debug = debug;
+ }
+
+ public Event(Map<String, Object> passedEventData) {
+ super(passedEventData);
+
+ eventName = this.get(EVENT_NAME_KEY, "unknown");
+ timestamp = this.get(TIMESTAMP_KEY, -1L);
+
+ List<EventRecord> plainCompoundKeyList = null;
+ if ((plainCompoundKeyList = get(Dispatcher.PARTITION_INFO_KEY,
+ EMPTY_LIST)) != EMPTY_LIST) {
+ for (EventRecord plainCompoundKeyInfo : plainCompoundKeyList) {
+ CompoundKeyInfo compoundKeyInfo = new CompoundKeyInfo();
+ compoundKeyInfo.setCompoundValue(plainCompoundKeyInfo.get("compoundValue",
+ (String) null));
+ compoundKeyInfo.setCompoundKey(plainCompoundKeyInfo.get("compoundKey",
+ (String) null));
+ compoundKeys.add(compoundKeyInfo);
+ for (EventRecord plainKeyInfo : plainCompoundKeyInfo.get("keyInfoList",
+ EMPTY_LIST)) {
+ KeyInfo keyInfo = new KeyInfo();
+ for (EventRecord plainKeyPathElement : plainKeyInfo.get("keyPathElementList",
+ EMPTY_LIST)) {
+ String keyName = plainKeyPathElement.get("keyName",
+ (String) null);
+ Integer index = plainKeyPathElement.get("index",
+ (Integer) null);
+
+ if (keyName != null) {
+ keyInfo.addElementToPath(keyName);
+ } else if (index != null) {
+ keyInfo.addElementToPath(index);
+ }
+ }
+ compoundKeyInfo.addKeyInfo(keyInfo);
+ }
+ }
+ }
+ if (debug) {
+ for (CompoundKeyInfo compoundKeyInfo : compoundKeys) {
+ System.out.println("CompoundKey: "
+ + compoundKeyInfo.getCompoundValue());
+ for (KeyInfo keyInfo : compoundKeyInfo.getKeyInfoList()) {
+ String keyPath = "";
+ for (KeyPathElement keyPathElement : keyInfo.getKeyPath()) {
+ if (keyPathElement instanceof KeyPathElementIndex) {
+ keyPath += "["
+ + ((KeyPathElementIndex) keyPathElement).getIndex()
+ + "]";
+ } else {
+ if (keyPath.length() > 0) {
+ keyPath += "/";
+ }
+ keyPath += ((KeyPathElementName) keyPathElement).getKeyName();
+ }
+ }
+ System.out.println(" " + keyPath);
+ }
+ }
+ }
+ }
+
+ public List<CompoundKeyInfo> getCompoundKeys() {
+ return compoundKeys;
+ }
+
+ public String getEventName() {
+ return eventName;
+ }
+
+ public long getTimeStamp() {
+ return timestamp;
+ }
+
+ public List<Map<String, Object>> getCompoundKeyList() {
+ return get(Dispatcher.PARTITION_INFO_KEY,
+ new ArrayList<Map<String, Object>>());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/collector/EventListener.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/collector/EventListener.java b/s4-core/src/main/java/org/apache/s4/collector/EventListener.java
new file mode 100644
index 0000000..721d600
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/collector/EventListener.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.collector;
+
+import static org.apache.s4.util.MetricsName.S4_CORE_METRICS;
+import static org.apache.s4.util.MetricsName.S4_EVENT_METRICS;
+import static org.apache.s4.util.MetricsName.generic_listener_msg_in_ct;
+import org.apache.s4.listener.EventHandler;
+import org.apache.s4.logger.Monitor;
+import org.apache.s4.processor.AsynchronousEventProcessor;
+import org.apache.s4.processor.PEContainer;
+
+import org.apache.log4j.Logger;
+
+public class EventListener implements EventHandler {
+ private static Logger logger = Logger.getLogger(EventListener.class);
+ private int eventCount = 0;
+ private AsynchronousEventProcessor eventProcessor;
+ private org.apache.s4.listener.EventListener rawListener;
+ private Monitor monitor;
+
+ public void setMonitor(Monitor monitor) {
+ this.monitor = monitor;
+ }
+
+ public void setPeContainer(PEContainer peContainer) {
+ this.eventProcessor = peContainer;
+ }
+
+ public void setEventProcessor(AsynchronousEventProcessor eventProcessor) {
+ this.eventProcessor = eventProcessor;
+ }
+
+ public void setRawListener(org.apache.s4.listener.EventListener rawListener) {
+ this.rawListener = rawListener;
+ }
+
+ public org.apache.s4.listener.EventListener getRawListener() {
+ return this.rawListener;
+ }
+
+ public int getEventCount() {
+ return eventCount;
+ }
+
+ public EventListener() {
+
+ }
+
+ public void init() {
+ rawListener.addHandler(this);
+ }
+
+ public void processEvent(EventWrapper eventWrapper) {
+ try {
+ synchronized (this) {
+ eventCount++;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("STEP 3 (EventListener): peContainer.addEvent - "
+ + eventWrapper.getEvent().toString());
+ }
+ eventProcessor.queueWork(eventWrapper);
+
+ if (monitor != null) {
+ monitor.increment(generic_listener_msg_in_ct.toString(),
+ 1,
+ S4_EVENT_METRICS.toString(),
+ "et",
+ eventWrapper.getStreamName());
+ monitor.increment(generic_listener_msg_in_ct.toString(),
+ 1,
+ S4_CORE_METRICS.toString());
+ }
+ } catch (Exception e) {
+ logger.error("Exception in processEvent on thread "
+ + Thread.currentThread().getId(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/collector/EventRecord.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/collector/EventRecord.java b/s4-core/src/main/java/org/apache/s4/collector/EventRecord.java
new file mode 100644
index 0000000..8dbb4ea
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/collector/EventRecord.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.collector;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class EventRecord implements Map<String, Object> {
+
+ public static EventRecord EMPTY_RECORD = new EventRecord(new HashMap<String, Object>());
+ public static List<EventRecord> EMPTY_LIST = Collections.unmodifiableList(new ArrayList<EventRecord>());
+
+ private Map<String, Object> eventData = new HashMap<String, Object>();
+ private Map<String, Object> additionalData = new HashMap<String, Object>();
+
+ public EventRecord(Map<String, Object> passedEventData) {
+ this(passedEventData, true);
+ }
+
+ private EventRecord(Map<String, Object> passedEventData,
+ boolean processEventData) {
+ if (processEventData) {
+ eventData = processMap(passedEventData, true);
+ } else {
+ eventData = passedEventData;
+ }
+ }
+
+ private Map<String, Object> processMap(Map<String, Object> inputMap) {
+ return processMap(inputMap, false);
+ }
+
+ private Map<String, Object> processMap(Map<String, Object> inputMap, boolean returnRaw) {
+ Map<String, Object> eventData = new HashMap<String, Object>();
+ for (String key : inputMap.keySet()) {
+ Object value = inputMap.get(key);
+ if (value instanceof Map<?, ?>) {
+ eventData.put(key, processMap((Map<String, Object>) value));
+ } else if (value instanceof List<?>) {
+ eventData.put(key,
+ processList((List<Map<String, Object>>) value));
+ } else {
+ eventData.put(key, value);
+ }
+ }
+ if (returnRaw)
+ return eventData;
+ return new EventRecord(eventData, false);
+ }
+
+ private List<Map<String, Object>> processList(List<Map<String, Object>> inputList) {
+ List<Map<String, Object>> eventList = new ArrayList<Map<String, Object>>();
+ for (Map<String, Object> inputMap : inputList) {
+ eventList.add(processMap(inputMap));
+ }
+ return Collections.unmodifiableList(eventList);
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean containsKey(Object key) {
+ return eventData.containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(Object value) {
+ return eventData.containsValue(value);
+ }
+
+ @Override
+ public Set<java.util.Map.Entry<String, Object>> entrySet() {
+ return eventData.entrySet();
+ }
+
+ @Override
+ public Object get(Object key) {
+ return eventData.get(key);
+ }
+
+ public <T> T get(String key, T defaultValue) {
+ return get(key, defaultValue, eventData);
+ }
+
+ private <T> T get(String key, T defaultValue, Map<String, Object> map) {
+ Object value = map.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ return (T) value;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return eventData.isEmpty();
+ }
+
+ @Override
+ public Set<String> keySet() {
+ return eventData.keySet();
+ }
+
+ @Override
+ public Object put(String key, Object value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void putAll(Map<? extends String, ? extends Object> m) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object remove(Object key) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int size() {
+ return eventData.size();
+ }
+
+ @Override
+ public Collection<Object> values() {
+ return eventData.values();
+ }
+
+ public void setAdditionalProperty(String key, Object value) {
+ additionalData.put(key, value);
+ }
+
+ public <T> T getAdditionalProperty(String key, T defaultValue) {
+ return get(key, defaultValue, additionalData);
+ }
+
+ public void removeAdditionalProperty(String key) {
+ additionalData.remove(key);
+ }
+
+ public Map<String, Object> getMutableMap() {
+ return getMutableMap(this.eventData);
+ }
+
+ public Map<String, Object> getMutableMap(Map<String, Object> recordData) {
+ Map<String, Object> mutableData = new HashMap<String, Object>();
+ for (String key : recordData.keySet()) {
+ Object value = recordData.get(key);
+ if (value instanceof Map<?, ?>) {
+ mutableData.put(key, getMutableMap((Map<String, Object>) value));
+ } else if (value instanceof List<?>) {
+ mutableData.put(key,
+ getMutableList((List<Map<String, Object>>) value));
+ } else {
+ mutableData.put(key, value);
+ }
+ }
+ return mutableData;
+ }
+
+ public List<Map<String, Object>> getMutableList(List<Map<String, Object>> recordList) {
+ List<Map<String, Object>> mutableList = new ArrayList<Map<String, Object>>();
+ for (Map<String, Object> recordData : recordList) {
+ mutableList.add(getMutableMap(recordData));
+ }
+ return mutableList;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/collector/EventWrapper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/collector/EventWrapper.java b/s4-core/src/main/java/org/apache/s4/collector/EventWrapper.java
new file mode 100644
index 0000000..cc5b2a6
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/collector/EventWrapper.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file.
+ */
+package org.apache.s4.collector;
+
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+
+public class EventWrapper {
+ private List<CompoundKeyInfo> compoundKeys = null;
+ private List<List<String>> compoundKeyNames = null;
+ private Object event;
+ private String streamName;
+
+ public List<CompoundKeyInfo> getCompoundKeys() {
+ return compoundKeys;
+ }
+
+ public Object getEvent() {
+ return event;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ public List<List<String>> getCompoundKeyNames() {
+ return compoundKeyNames;
+ }
+
+ public EventWrapper() {
+ compoundKeys = new ArrayList<CompoundKeyInfo>();
+ }
+
+ public EventWrapper(String streamName, Object event,
+ List<CompoundKeyInfo> compoundKeys) {
+ this.streamName = streamName;
+ this.event = event;
+ this.compoundKeys = compoundKeys;
+ }
+
+ public EventWrapper(String streamName, String[] compoundKeyStrings,
+ Object event) {
+ this.streamName = streamName;
+ this.event = event;
+
+ if (compoundKeyStrings != null) {
+ this.compoundKeyNames = new ArrayList<List<String>>(compoundKeyStrings.length);
+
+ for (String keyAsString : compoundKeyStrings) {
+ List<String> keyNameElements = new ArrayList<String>();
+ StringTokenizer st = new StringTokenizer(keyAsString, "/");
+ while (st.hasMoreTokens()) {
+ keyNameElements.add(st.nextToken());
+ }
+ compoundKeyNames.add(keyNameElements);
+ }
+ }
+ }
+
+ public String toString() {
+ return "stream:" + getStreamName() + " keys:" + getCompoundKeys()
+ + " keyNames:" + getCompoundKeyNames() + " event:" + getEvent();
+ }
+}