You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:16 UTC

[35/50] [abbrv] Rename packages in preparation for move to Apache

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/adapter/Adapter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/adapter/Adapter.java b/s4-core/src/main/java/org/apache/s4/adapter/Adapter.java
new file mode 100644
index 0000000..8b57721
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/adapter/Adapter.java
@@ -0,0 +1,208 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.adapter;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.dispatcher.EventDispatcher;
+import org.apache.s4.listener.EventHandler;
+import org.apache.s4.listener.EventListener;
+import org.apache.s4.listener.EventProducer;
+import org.apache.s4.util.S4Util;
+
+import java.io.File;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.log4j.Logger;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.FileSystemXmlApplicationContext;
+
+public class Adapter implements EventHandler {
+    private static String coreHome = "../s4_core";
+
+    private EventDispatcher dispatcher;
+    private EventProducer[] eventListeners;
+    private String configFilename;
+
+    public void setDispatcher(EventDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    public void setEventListeners(EventProducer[] eventListeners) {
+        this.eventListeners = eventListeners;
+        for (EventProducer eventListener : eventListeners) {
+            eventListener.addHandler(this);
+        }
+    }
+
+    public void setConfigFilename(String configFilename) {
+        this.configFilename = configFilename;
+    }
+
+    private volatile int eventCount = 0;
+    private volatile int rawEventCount = 0;
+
+    public Adapter() {
+
+    }
+
+    int counts[];
+
+    private boolean init = false;
+
+    public void init() {
+        synchronized (this) {
+            init = true;
+        }
+    }
+
+    public void processEvent(EventWrapper eventWrapper) {
+        try {
+            synchronized (this) {
+                if (!init) {
+                    return;
+                }
+                rawEventCount++;
+                eventCount++;
+            }
+            dispatcher.dispatchEvent(eventWrapper.getStreamName(),
+                                     eventWrapper.getEvent());
+        } catch (Exception e) {
+            Logger.getLogger("dispatcher").info("Exception adapting event", e);
+        }
+    }
+
+    public static void main(String args[]) {
+        Options options = new Options();
+
+        options.addOption(OptionBuilder.withArgName("corehome")
+                                       .hasArg()
+                                       .withDescription("core home")
+                                       .create("c"));
+
+        options.addOption(OptionBuilder.withArgName("instanceid")
+                                       .hasArg()
+                                       .withDescription("instance id")
+                                       .create("i"));
+
+        options.addOption(OptionBuilder.withArgName("configtype")
+                                       .hasArg()
+                                       .withDescription("configuration type")
+                                       .create("t"));
+
+        options.addOption(OptionBuilder.withArgName("userconfig")
+                                       .hasArg()
+                                       .withDescription("user-defined legacy data adapter configuration file")
+                                       .create("d"));
+
+        CommandLineParser parser = new GnuParser();
+        CommandLine commandLine = null;
+
+        try {
+            commandLine = parser.parse(options, args);
+        } catch (ParseException pe) {
+            System.err.println(pe.getLocalizedMessage());
+            System.exit(1);
+        }
+
+        int instanceId = -1;
+        if (commandLine.hasOption("i")) {
+            String instanceIdStr = commandLine.getOptionValue("i");
+            try {
+                instanceId = Integer.parseInt(instanceIdStr);
+            } catch (NumberFormatException nfe) {
+                System.err.println("Bad instance id: %s" + instanceIdStr);
+                System.exit(1);
+            }
+        }
+
+        if (commandLine.hasOption("c")) {
+            coreHome = commandLine.getOptionValue("c");
+        }
+
+        String configType = "typical";
+        if (commandLine.hasOption("t")) {
+            configType = commandLine.getOptionValue("t");
+        }
+
+        String userConfigFilename = null;
+        if (commandLine.hasOption("d")) {
+            userConfigFilename = commandLine.getOptionValue("d");
+        }
+
+        File userConfigFile = new File(userConfigFilename);
+        if (!userConfigFile.isFile()) {
+            System.err.println("Bad user configuration file: "
+                    + userConfigFilename);
+            System.exit(1);
+        }
+
+        File coreHomeFile = new File(coreHome);
+        if (!coreHomeFile.isDirectory()) {
+            System.err.println("Bad core home: " + coreHome);
+            System.exit(1);
+        }
+
+        if (instanceId > -1) {
+            System.setProperty("instanceId", "" + instanceId);
+        } else {
+            System.setProperty("instanceId", "" + S4Util.getPID());
+        }
+
+        String configBase = coreHome + File.separatorChar + "conf"
+                + File.separatorChar + configType;
+        String configPath = configBase + File.separatorChar
+                + "adapter-conf.xml";
+        File configFile = new File(configPath);
+        if (!configFile.exists()) {
+            System.err.printf("adapter config file %s does not exist\n",
+                              configPath);
+            System.exit(1);
+        }
+
+        // load adapter config xml
+        ApplicationContext coreContext;
+        coreContext = new FileSystemXmlApplicationContext("file:" + configPath);
+        ApplicationContext context = coreContext;
+
+        Adapter adapter = (Adapter) context.getBean("adapter");
+
+        ApplicationContext appContext = new FileSystemXmlApplicationContext(new String[] { "file:"
+                                                                                    + userConfigFilename },
+                                                                            context);
+        Map listenerBeanMap = appContext.getBeansOfType(EventProducer.class);
+        if (listenerBeanMap.size() == 0) {
+            System.err.println("No user-defined listener beans");
+            System.exit(1);
+        }
+        EventProducer[] eventListeners = new EventProducer[listenerBeanMap.size()];
+
+        int index = 0;
+        for (Iterator it = listenerBeanMap.keySet().iterator(); it.hasNext(); index++) {
+            String beanName = (String) it.next();
+            System.out.println("Adding producer " + beanName);
+            eventListeners[index] = (EventProducer) listenerBeanMap.get(beanName);
+        }
+
+        adapter.setEventListeners(eventListeners);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/Adapter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/Adapter.java b/s4-core/src/main/java/org/apache/s4/client/Adapter.java
new file mode 100644
index 0000000..19a024b
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/Adapter.java
@@ -0,0 +1,432 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client;
+
+import org.apache.s4.collector.EventListener;
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.dispatcher.EventDispatcher;
+import org.apache.s4.listener.EventHandler;
+import org.apache.s4.message.Request;
+import org.apache.s4.processor.AsynchronousEventProcessor;
+import org.apache.s4.util.S4Util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Logger;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.FileSystemXmlApplicationContext;
+
+/**
+ * Adapter to connect an S4 cluster to an external client to read from and write
+ * into the S4 cluster.
+ */
+public class Adapter {
+    private static String coreHome = "../s4_core";
+
+    // Accept events from clients and send into S4 cluster.
+    private EventDispatcher dispatcher;
+    private Writer eventWriter = new Writer();
+
+    // Listen to events from S4 cluster and send to clients.
+    private org.apache.s4.collector.EventListener clusterEventListener;
+    private Reader eventReader = new Reader();
+
+    /**
+     * Set the dispatcher to use for sending events into S4.
+     * 
+     * @param dispatcher
+     */
+    public void setDispatcher(EventDispatcher dispatcher) {
+        this.dispatcher = dispatcher;
+    }
+
+    /**
+     * clusterEventListener receives events from S4 cluster. Processing of
+     * events is delegated to the eventReader, which typically forwards them to
+     * appropriate clients.
+     * 
+     * @param clusterEventListener
+     */
+    public void setClusterEventListener(EventListener clusterEventListener) {
+        this.clusterEventListener = clusterEventListener;
+        this.clusterEventListener.setEventProcessor(eventReader);
+    }
+
+    public Adapter() {
+    }
+
+    int counts[];
+
+    private boolean init = false;
+
+    public void init() {
+        synchronized (this) {
+            init = true;
+        }
+    }
+
+    /**
+     * Register a list of InputStubs that will send events into the S4 cluster.
+     * These events will be processed by the eventWriter: i.e. the eventWriter
+     * dispatches them into the S4 cluster.
+     * 
+     * @param stubs
+     */
+    public void setInputStubs(List<InputStub> stubs) {
+        for (InputStub stub : stubs) {
+            // register the writer as the handler for the stub's events
+            stub.addHandler(eventWriter);
+        }
+    }
+
+    // collections of eventReceivers (OutputStubs) to which events will be sent
+    // for forwarding to clients.
+    HashMap<String, List<OutputStub>> eventReceivers = new HashMap<String, List<OutputStub>>();
+    List<OutputStub> eventReceiversAny = new ArrayList<OutputStub>();
+    List<OutputStub> eventReceiversAll = new ArrayList<OutputStub>();
+
+    /**
+     * Register a list of OutputStubs which process events received from the S4
+     * cluster, typically forwarding them to clients.
+     * 
+     * The {@link OutputStub#getAcceptedStreams()} is called to determine which
+     * streams the stub is interested in receiving. Accordingly, three
+     * collections of stubs are created.
+     * 
+     * <ol>
+     * <li>A mapping from stream name to OutputStubs (One-to-many). Used for
+     * routing.</li>
+     * <li>A list of OutputStubs that accept all streams (
+     * {@code stub.getAcceptedStreams() == null}. Used for routing.</li>
+     * <li>A list of all OutputStubs. This is used to iterate over all the stubs
+     * exactly once.</li>
+     * </ol>
+     * 
+     * @param stubs
+     *            the list of output stubs.
+     */
+    public void setOutputStubs(List<OutputStub> stubs) {
+        eventReceiversAll.addAll(stubs);
+
+        for (OutputStub stub : stubs) {
+            // update mapping of stream names to stubs that accept events on
+            // that stream.
+            List<String> streams = stub.getAcceptedStreams();
+            if (streams != null) {
+                for (String stream : streams) {
+                    List<OutputStub> stubList = eventReceivers.get(stream);
+                    if (stubList == null) {
+                        stubList = new ArrayList<OutputStub>();
+                        eventReceivers.put(stream, stubList);
+                    }
+
+                    stubList.add(stub);
+                }
+            } else {
+                eventReceiversAny.add(stub);
+            }
+        }
+    }
+
+    /**
+     * Write events from input stubs into S4 cluster.
+     */
+    private class Writer implements EventHandler {
+        // events to be dispatched into cluster
+        public void processEvent(EventWrapper eventWrapper) {
+            try {
+                synchronized (this) {
+                    if (!init) {
+                        return;
+                    }
+                    rawEventCount++;
+                    eventCount++;
+                }
+
+                // null keys => round-robin
+                // empty key list => default partitioning of underlying
+                // partitioner.
+                List<List<String>> keys = eventWrapper.getCompoundKeyNames();
+
+                String stream = eventWrapper.getStreamName();
+
+                Object event = eventWrapper.getEvent();
+
+                if (event instanceof Request)
+                    decorateRequest((Request) event);
+
+                dispatcher.dispatchEvent(stream, keys, event);
+
+            } catch (Exception e) {
+                Logger.getLogger("adapter").info("Exception adapting event", e);
+            }
+        }
+
+        private volatile int eventCount = 0;
+        private volatile int rawEventCount = 0;
+
+        // set the return stream name to the adapter cluster name
+        private void decorateRequest(Request r) {
+            Request.RInfo rinfo = r.getRInfo();
+
+            if (rinfo instanceof Request.ClientRInfo) {
+                String cname = clusterEventListener.getRawListener()
+                                                   .getAppName();
+
+                ((Request.ClientRInfo) rinfo).setStream("@" + cname);
+            }
+        }
+    }
+
+    /**
+     * Read events from S4 cluster.
+     */
+    private class Reader implements AsynchronousEventProcessor {
+        /**
+         * Queue work for processing by OutputStubs. This method simply queues
+         * the events in the appropriate stubs. An event from stream K is queued
+         * in OutputStub S if either S accepts all streams, or K is contained in
+         * the list {@code S.getAcceptedStreams()}.
+         */
+        @Override
+        public void queueWork(EventWrapper eventWrapper) {
+            List<OutputStub> stubs = eventReceivers.get(eventWrapper.getStreamName());
+
+            // stubs that accept any stream
+            for (OutputStub stub : eventReceiversAny)
+                stub.queueWork(eventWrapper);
+
+            // stubs that receive this stream in particular
+            if (stubs != null)
+                for (OutputStub stub : stubs)
+                    stub.queueWork(eventWrapper);
+        }
+
+        @Override
+        public int getQueueSize() {
+            int sz = 0;
+
+            for (OutputStub stub : eventReceiversAll)
+                sz += stub.getQueueSize();
+
+            return sz;
+        }
+
+    }
+
+    private static class TestDispatcher implements EventDispatcher {
+        @Override
+        public void dispatchEvent(String s, Object e) {
+            System.out.println("Dispatching event: " + s + ":" + e);
+        }
+
+        @Override
+        public void dispatchEvent(String s, List<List<String>> k, Object e) {
+            System.out.println("Dispatching event: " + s + ":" + k + ":" + e);
+        }
+    }
+
+    private static class TestReturnType {
+        private int ra;
+        private int rb;
+
+        @SuppressWarnings("unused")
+        TestReturnType() {
+        }
+
+        TestReturnType(int a, int b) {
+            this.ra = a;
+            this.rb = b;
+        }
+
+        public String toString() {
+            return "ra=" + ra + " rb=" + rb;
+        }
+    }
+
+    @SuppressWarnings("static-access")
+    public static void main(String args[]) throws IOException,
+            InterruptedException {
+
+        Options options = new Options();
+
+        options.addOption(OptionBuilder.withArgName("corehome")
+                                       .hasArg()
+                                       .withDescription("core home")
+                                       .create("c"));
+
+        options.addOption(OptionBuilder.withArgName("instanceid")
+                                       .hasArg()
+                                       .withDescription("instance id")
+                                       .create("i"));
+
+        options.addOption(OptionBuilder.withArgName("configtype")
+                                       .hasArg()
+                                       .withDescription("configuration type")
+                                       .create("t"));
+
+        options.addOption(OptionBuilder.withArgName("userconfig")
+                                       .hasArg()
+                                       .withDescription("user-defined legacy data adapter configuration file")
+                                       .create("d"));
+
+        CommandLineParser parser = new GnuParser();
+        CommandLine commandLine = null;
+
+        try {
+            commandLine = parser.parse(options, args);
+        } catch (ParseException pe) {
+            System.err.println(pe.getLocalizedMessage());
+            System.exit(1);
+        }
+
+        int instanceId = -1;
+        if (commandLine.hasOption("i")) {
+            String instanceIdStr = commandLine.getOptionValue("i");
+            try {
+                instanceId = Integer.parseInt(instanceIdStr);
+            } catch (NumberFormatException nfe) {
+                System.err.println("Bad instance id: %s" + instanceIdStr);
+                System.exit(1);
+            }
+        }
+
+        if (commandLine.hasOption("c")) {
+            coreHome = commandLine.getOptionValue("c");
+        }
+
+        String configType = "typical";
+        if (commandLine.hasOption("t")) {
+            configType = commandLine.getOptionValue("t");
+        }
+
+        String userConfigFilename = null;
+        if (commandLine.hasOption("d")) {
+            userConfigFilename = commandLine.getOptionValue("d");
+        }
+
+        File userConfigFile = new File(userConfigFilename);
+        if (!userConfigFile.isFile()) {
+            System.err.println("Bad user configuration file: "
+                    + userConfigFilename);
+            System.exit(1);
+        }
+
+        File coreHomeFile = new File(coreHome);
+        if (!coreHomeFile.isDirectory()) {
+            System.err.println("Bad core home: " + coreHome);
+            System.exit(1);
+        }
+
+        if (instanceId > -1) {
+            System.setProperty("instanceId", "" + instanceId);
+        } else {
+            System.setProperty("instanceId", "" + S4Util.getPID());
+        }
+
+        String configBase = coreHome + File.separatorChar + "conf"
+                + File.separatorChar + configType;
+        String configPath = configBase + File.separatorChar
+                + "client-adapter-conf.xml";
+        File configFile = new File(configPath);
+        if (!configFile.exists()) {
+            System.err.printf("adapter config file %s does not exist\n",
+                              configPath);
+            System.exit(1);
+        }
+
+        // load adapter config xml
+        ApplicationContext coreContext;
+        coreContext = new FileSystemXmlApplicationContext("file:" + configPath);
+        ApplicationContext context = coreContext;
+
+        Adapter adapter = (Adapter) context.getBean("client_adapter");
+
+        ApplicationContext appContext = new FileSystemXmlApplicationContext(new String[] { "file:"
+                                                                                    + userConfigFilename },
+                                                                            context);
+
+        Map<?, ?> inputStubBeanMap = appContext.getBeansOfType(InputStub.class);
+        Map<?, ?> outputStubBeanMap = appContext.getBeansOfType(OutputStub.class);
+
+        if (inputStubBeanMap.size() == 0 && outputStubBeanMap.size() == 0) {
+            System.err.println("No user-defined input/output stub beans");
+            System.exit(1);
+        }
+
+        ArrayList<InputStub> inputStubs = new ArrayList<InputStub>(inputStubBeanMap.size());
+        ArrayList<OutputStub> outputStubs = new ArrayList<OutputStub>(outputStubBeanMap.size());
+
+        // add all input stubs
+        for (Map.Entry<?, ?> e : inputStubBeanMap.entrySet()) {
+            String beanName = (String) e.getKey();
+            System.out.println("Adding InputStub " + beanName);
+            inputStubs.add((InputStub) e.getValue());
+        }
+
+        // add all output stubs
+        for (Map.Entry<?, ?> e : outputStubBeanMap.entrySet()) {
+            String beanName = (String) e.getKey();
+            System.out.println("Adding OutputStub " + beanName);
+            outputStubs.add((OutputStub) e.getValue());
+        }
+
+        adapter.setInputStubs(inputStubs);
+        adapter.setOutputStubs(outputStubs);
+
+    }
+
+    public static void clientTest() throws IOException, InterruptedException {
+        BasicConfigurator.configure();
+
+        TestDispatcher disp = new TestDispatcher();
+
+        Adapter adapter = new Adapter();
+        adapter.setDispatcher(disp);
+
+        GenericJsonClientStub stub = new GenericJsonClientStub();
+        stub.setConnectionPort(2334);
+
+        InputStub[] in = { stub };
+        OutputStub[] out = { stub };
+        adapter.setInputStubs(Arrays.asList(in));
+        adapter.setOutputStubs(Arrays.asList(out));
+
+        adapter.init();
+        stub.init();
+
+        while (true) {
+            Thread.sleep(10000);
+            TestReturnType r = new TestReturnType(100, 200);
+            adapter.eventReader.queueWork(new EventWrapper("TESTSTREAM",
+                                                           r,
+                                                           null));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/ClientConnection.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/ClientConnection.java b/s4-core/src/main/java/org/apache/s4/client/ClientConnection.java
new file mode 100644
index 0000000..388094a
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/ClientConnection.java
@@ -0,0 +1,163 @@
+package org.apache.s4.client;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.message.Request;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.HashSet;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Connection to a client. A Stub has a collection of connections.
+ */
+public class ClientConnection {
+    /**
+     * 
+     */
+    private final ClientStub clientStub;
+
+    /**
+     * TCP/IP socket used to communicate with client.
+     */
+    private final Socket socket;
+
+    public final IOChannel io;
+
+    public final ClientReadMode clientReadMode;
+    public final ClientWriteMode clientWriteMode;
+
+    /**
+     * GUID of client.
+     */
+    public final UUID uuid;
+
+    public ClientConnection(ClientStub clientStub, Socket socket, UUID uuid,
+            ClientReadMode clientReadMode, ClientWriteMode clientWriteMode)
+            throws IOException {
+        this.clientStub = clientStub;
+        this.uuid = uuid;
+        this.socket = socket;
+        this.io = this.clientStub.createIOChannel(socket);
+        this.clientReadMode = clientReadMode;
+        this.clientWriteMode = clientWriteMode;
+    }
+
+    public boolean good() {
+        return socket.isConnected();
+    }
+
+    public void close() {
+        synchronized (this.clientStub.clients) {
+            ClientStub.logger.info("closing connection to client " + uuid);
+            this.clientStub.clients.remove(this.uuid);
+        }
+
+        try {
+            socket.close();
+        } catch (IOException e) {
+            ClientStub.logger.error("problem closing client connection to client "
+                                            + uuid,
+                                    e);
+        }
+    }
+
+    private HashSet<String> includeStreams = new HashSet<String>();
+    private HashSet<String> excludeStreams = new HashSet<String>();
+
+    public void includeStreams(List<String> streams) {
+        includeStreams.addAll(streams);
+    }
+
+    public void excludeStreams(List<String> streams) {
+        excludeStreams.addAll(streams);
+    }
+
+    /**
+     * Stream is accepted if and only if:
+     * 
+     * (A) readMode is Select and: 1. indifferent to stream inclusion OR stream
+     * is included AND 2. indifferent to stream exclusion OR stream is not
+     * excluded.
+     * 
+     * OR (B) readMode is All
+     * 
+     * @return true if and only if stream is accepted
+     */
+    public boolean streamAccepted(String s) {
+        switch (clientReadMode) {
+            case None:
+            case Private:
+                return false;
+
+            case Select:
+                return (includeStreams.isEmpty() || includeStreams.contains(s))
+                        && (excludeStreams.isEmpty() || !excludeStreams.contains(s));
+
+            case All:
+                return true;
+
+            default:
+                return false;
+        }
+    }
+
+    private Thread receiverThread = null;
+
+    public void start() {
+        if (clientWriteMode == ClientWriteMode.Enabled)
+            (receiverThread = new Thread(receiver)).start();
+    }
+
+    public void stop() {
+        if (receiverThread != null) {
+            receiverThread.interrupt();
+            receiverThread = null;
+        }
+    }
+
+    public final Runnable receiver = new Runnable() {
+        public void run() {
+            try {
+                while (good()) {
+                    byte[] b = io.recv();
+
+                    // null, empty => goodbye
+                    if (b == null || b.length == 0) {
+                        ClientStub.logger.info("client session ended " + uuid);
+                        break;
+                    }
+
+                    EventWrapper ew = ClientConnection.this.clientStub.eventWrapperFromBytes(b);
+                    if (ew == null)
+                        continue;
+
+                    Object event = ew.getEvent();
+                    if (event instanceof Request) {
+                        decorateRequest((Request) event);
+                        ClientStub.logger.info("Decorated client request: "
+                                + ew.toString());
+                    }
+
+                    ClientConnection.this.clientStub.injectEvent(ew);
+                }
+            } catch (IOException e) {
+                ClientStub.logger.info("error while reading from client "
+                        + uuid, e);
+
+            } finally {
+                close();
+            }
+        }
+
+        private void decorateRequest(Request r) {
+            // add UUID of client into request.
+            Request.RInfo info = r.getRInfo();
+
+            if (info != null && info instanceof Request.ClientRInfo)
+                ((Request.ClientRInfo) info).setRequesterUUID(ClientConnection.this.uuid);
+        }
+    };
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/ClientReadMode.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/ClientReadMode.java b/s4-core/src/main/java/org/apache/s4/client/ClientReadMode.java
new file mode 100644
index 0000000..03df61a
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/ClientReadMode.java
@@ -0,0 +1,37 @@
+package org.apache.s4.client;
+
+/**
+ * Client mode.
+ */
+public enum ClientReadMode {
+    None(false, false), Private(true, false), Select(true, true), All(true, true);
+
+    private final boolean priv;
+    private final boolean pub;
+
+    ClientReadMode(boolean priv, boolean pub) {
+        this.priv = priv;
+        this.pub = pub;
+    }
+
+    public boolean takePublic() {
+        return pub;
+    }
+
+    public boolean takePrivate() {
+        return priv;
+    }
+
+    public static ClientReadMode fromString(String s) {
+        if (s.equalsIgnoreCase("none"))
+            return None;
+        else if (s.equalsIgnoreCase("private"))
+            return Private;
+        else if (s.equalsIgnoreCase("select"))
+            return Select;
+        else if (s.equalsIgnoreCase("all"))
+            return All;
+        else
+            return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/ClientStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/ClientStub.java b/s4-core/src/main/java/org/apache/s4/client/ClientStub.java
new file mode 100644
index 0000000..f1acd44
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/ClientStub.java
@@ -0,0 +1,312 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.listener.EventHandler;
+import org.apache.s4.message.Request;
+import org.apache.s4.message.Response;
+import org.apache.s4.util.ByteArrayIOChannel;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+public abstract class ClientStub implements OutputStub, InputStub {
+
+    protected static final Logger logger = Logger.getLogger("adapter");
+
+    /**
+     * Description of the protocol implemented by a concrete instance of this
+     * stub.
+     */
+    public static class Info {
+        public final String name;
+        public final int versionMajor;
+        public final int versionMinor;
+
+        public Info(String name, int versionMajor, int versionMinor) {
+            this.name = name;
+            this.versionMajor = versionMajor;
+            this.versionMinor = versionMinor;
+        }
+    }
+
+    /**
+     * Meta-information about the protocol that this stub uses to talk to
+     * external clients.
+     * 
+     * This is sent to the client as a part of the handshake.
+     */
+    abstract public Info getProtocolInfo();
+
+    /**
+     * Stream names that are accepted by this stub to be forwarded to its
+     * clients.
+     */
+    @Override
+    public List<String> getAcceptedStreams() {
+        return null;
+    }
+
+    private List<EventHandler> handlers = new ArrayList<EventHandler>();
+
+    /**
+     * A handler that can inject events produced by this stub into the S4
+     * cluster.
+     */
+    @Override
+    public void addHandler(EventHandler handler) {
+        this.handlers.add(handler);
+    }
+
+    /**
+     * Remove a handler.
+     */
+    @Override
+    public boolean removeHandler(EventHandler handler) {
+        return handlers.remove(handler);
+    }
+
+    /**
+     * Convert an array of bytes into an event wrapper. This method is used to
+     * translate data received from a client into events that may be injected
+     * into the S4 cluster.
+     * 
+     * @param v
+     *            array of bytes
+     * @return EventWrapper constructed from the byte array.
+     */
+    abstract public EventWrapper eventWrapperFromBytes(byte[] v);
+
+    /**
+     * Convert an event wrapper into a byte array. Events received from the S4
+     * cluster for dispatching to a client are translated into a byte array
+     * using this method.
+     * 
+     * @param e
+     *            an {@link EventWrapper}
+     * @return a byte array
+     */
+    abstract public byte[] bytesFromEventWrapper(EventWrapper e);
+
+    /**
+     * Construct an I/O channel over which the stub can communicate with a
+     * client. The channel allows arrys of bytes to be exchanged between the
+     * stub and client.
+     * 
+     * @param socket
+     *            TCP/IP socket
+     * @return an IO Channel to send and recv byte arrays
+     * @throws IOException
+     *             if the underlying socket could not provide valid input and
+     *             output streams.
+     */
+    public IOChannel createIOChannel(Socket socket) throws IOException {
+        return new ByteArrayIOChannel(socket);
+    }
+
+    // send an event into the cluster via adapter.
+    void injectEvent(EventWrapper e) {
+        for (EventHandler handler : handlers) {
+            handler.processEvent(e);
+        }
+    }
+
+    // private List<ClientConnection> clients = new
+    // ArrayList<ClientConnection>();
+    HashMap<UUID, ClientConnection> clients = new HashMap<UUID, ClientConnection>();
+
+    /**
+     * Create a client connection and add it to list of clients.
+     * 
+     * @param socket
+     *            client's I/O socket
+     */
+    private void addClient(ClientConnection c) {
+        synchronized (clients) {
+            logger.info("adding client " + c.uuid);
+            clients.put(c.uuid, c);
+        }
+    }
+
+    LinkedBlockingQueue<EventWrapper> queue = new LinkedBlockingQueue<EventWrapper>();
+
+    @Override
+    public int getQueueSize() {
+        return queue.size();
+    }
+
+    @Override
+    public void queueWork(EventWrapper e) {
+        queue.offer(e);
+    }
+
+    ServerSocket serverSocket = null;
+
+    public void setConnectionPort(int port) throws IOException {
+        serverSocket = new ServerSocket(port);
+    }
+
+    private Thread acceptThread = null;
+    private Thread senderThread = null;
+
+    public void init() {
+        // start accepting new clients and sending events to them
+        (acceptThread = new Thread(connectionListener)).start();
+        (senderThread = new Thread(sender)).start();
+    }
+
+    public void shutdown() {
+        // stop accepting new clients
+        if (acceptThread != null) {
+            acceptThread.interrupt();
+            acceptThread = null;
+        }
+
+        // stop sending events to them.
+        if (senderThread != null) {
+            senderThread.interrupt();
+            senderThread = null;
+        }
+
+        // stop all connected clients.
+        List<ClientConnection> clientCopy = new ArrayList<ClientConnection>(clients.values());
+        for (ClientConnection c : clientCopy) {
+            c.stop();
+            c.close();
+        }
+    }
+
+    private final Runnable connectionListener = new Runnable() {
+
+        Handshake handshake = null;
+
+        public void run() {
+            if (handshake == null)
+                handshake = new Handshake(ClientStub.this);
+
+            try {
+                while (serverSocket != null && serverSocket.isBound()
+                        && !Thread.currentThread().isInterrupted()) {
+
+                    Socket socket = serverSocket.accept();
+
+                    ClientConnection connection = handshake.execute(socket);
+
+                    if (connection != null) {
+                        addClient(connection);
+                        connection.start();
+                    }
+
+                }
+            } catch (IOException e) {
+                logger.info("exception in client connection listener", e);
+            }
+        }
+
+    };
+
+    public final Runnable sender = new Runnable() {
+        ArrayList<ClientConnection> disconnect = new ArrayList<ClientConnection>();
+
+        public void run() {
+
+            while (!Thread.currentThread().isInterrupted()) {
+                try {
+                    EventWrapper event = queue.take();
+
+                    // Responses need special handling.
+                    if (event.getEvent() instanceof Response) {
+                        dispatchResponse(event);
+                        continue;
+                    }
+
+                    // TODO: include check to see if the event belongs to a
+                    // particular client.
+
+                    dispatchToAllClients(event);
+
+                } catch (InterruptedException e) {
+                    return;
+                }
+            }
+        }
+
+        private void dispatchToAllClients(EventWrapper event) {
+
+            byte[] b = bytesFromEventWrapper(event);
+            String stream = event.getStreamName();
+            
+            synchronized (clients) {
+                for (ClientConnection c : clients.values()) {
+                    if (c.good() && c.streamAccepted(stream)) {
+                        try {
+                            c.io.send(b);
+
+                        } catch (IOException e) {
+                            logger.error("error sending message to client "
+                                    + c.uuid + ". disconnecting", e);
+
+                            disconnect.add(c);
+                        }
+                    }
+                }
+            }
+
+            if (disconnect.size() > 0) {
+                for (ClientConnection d : disconnect)
+                    d.close();
+
+                disconnect.clear();
+            }
+        }
+
+        private void dispatchResponse(EventWrapper event) {
+            Response res = (Response) event.getEvent();
+            Request.RInfo rinfo = res.getRInfo();
+
+            if (rinfo instanceof Request.ClientRInfo) {
+                UUID uuid = ((Request.ClientRInfo) rinfo).getRequesterUUID();
+
+                ClientConnection c = clients.get(uuid);
+
+                if (c != null && c.good() && c.clientReadMode.takePrivate()) {
+                    try {
+                        byte[] b = bytesFromEventWrapper(event);
+                        if (b != null) c.io.send(b);
+
+                    } catch (IOException e) {
+                        logger.error("error sending response to client "
+                                + c.uuid + ". disconnecting", e);
+
+                        c.close();
+                    }
+
+                } else {
+                    logger.warn("no active client found for response: " + res);
+                }
+            }
+        }
+    };
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/ClientWriteMode.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/ClientWriteMode.java b/s4-core/src/main/java/org/apache/s4/client/ClientWriteMode.java
new file mode 100644
index 0000000..e856ea7
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/ClientWriteMode.java
@@ -0,0 +1,17 @@
+package org.apache.s4.client;
+
+/**
+ * Client's write mode.
+ */
+public enum ClientWriteMode {
+    Enabled, Disabled;
+
+    public static ClientWriteMode fromString(String s) {
+        if (s.equalsIgnoreCase("enabled"))
+            return Enabled;
+        else if (s.equalsIgnoreCase("disabled"))
+            return Disabled;
+        else
+            return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/GenericJsonClientStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/GenericJsonClientStub.java b/s4-core/src/main/java/org/apache/s4/client/GenericJsonClientStub.java
new file mode 100644
index 0000000..6268407
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/GenericJsonClientStub.java
@@ -0,0 +1,107 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client;
+
+import org.apache.s4.client.util.ObjectBuilder;
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.util.GsonUtil;
+
+import java.nio.charset.Charset;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class GenericJsonClientStub extends ClientStub {
+
+    // private static final ObjectBuilder builder = new ObjectBuilder();
+    // private static final Gson builder = new Gson();
+
+    private static final Info protocolInfo = new Info("generic-json", 1, 0);
+
+    @Override
+    public Info getProtocolInfo() {
+        return protocolInfo;
+    }
+
+    @Override
+    public EventWrapper eventWrapperFromBytes(byte[] v) {
+        try {
+            // interpret v as a JSON string
+            String s = new String(v, Charset.forName("UTF8"));
+            JSONObject json = new JSONObject(s);
+
+            String streamName = json.getString("stream");
+            String className = json.getString("class");
+
+            Class<?> clazz;
+            try {
+                clazz = Class.forName(className);
+            } catch (ClassNotFoundException e) {
+                throw new ObjectBuilder.Exception("bad class name for json-encoded object: "
+                                                          + className,
+                                                  e);
+            }
+
+            String[] keyNames = null;
+            JSONArray keyArray = json.optJSONArray("keys");
+            if (keyArray != null) {
+                keyNames = new String[keyArray.length()];
+                for (int i = 0; i < keyNames.length; ++i) {
+                    keyNames[i] = keyArray.optString(i);
+                }
+            }
+
+            String jevent = json.getString("object");
+
+            Object obj = GsonUtil.get().fromJson(jevent, clazz);
+
+            return new EventWrapper(streamName, keyNames, obj);
+
+        } catch (JSONException e) {
+            logger.error("problem with event JSON", e);
+        } catch (ObjectBuilder.Exception e) {
+            logger.error("failed to build object from JSON", e);
+        }
+
+        return null;
+    }
+
+    @Override
+    public byte[] bytesFromEventWrapper(EventWrapper ew) {
+        JSONObject jevent = new JSONObject();
+
+        Object obj = ew.getEvent();
+
+        try {
+            jevent.put("stream", ew.getStreamName());
+            jevent.put("class", obj.getClass().getName());
+            jevent.put("object", GsonUtil.get().toJson(obj));
+
+            return jevent.toString().getBytes(Charset.forName("UTF8"));
+
+        } catch (JSONException e) {
+            logger.error("exception while converting event wrapper to bytes.",
+                         e);
+            return null;
+        } catch (IllegalArgumentException iae) {
+        	logger.error("exception while converting event wrapper to bytes.",
+                    iae);
+        	logger.error(obj);
+        	return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/Handshake.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/Handshake.java b/s4-core/src/main/java/org/apache/s4/client/Handshake.java
new file mode 100644
index 0000000..e149006
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/Handshake.java
@@ -0,0 +1,223 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client;
+
+import org.apache.s4.client.ClientStub.Info;
+import org.apache.s4.util.ByteArrayIOChannel;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.log4j.Logger;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.gson.Gson;
+
+public class Handshake {
+    private final ClientStub clientStub;
+
+    protected static final Logger logger = Logger.getLogger("adapter");
+
+    protected final Gson gson = new Gson();
+
+    public Handshake(ClientStub clientStub) {
+        this.clientStub = clientStub;
+    }
+
+    // execute handshake with client.
+    // 1. discovery: issue uuid;
+    // 2. main connect: create connection
+    public ClientConnection execute(Socket s) {
+        byte[] v;
+
+        try {
+            ClientConnection conn = null;
+
+            ByteArrayIOChannel io = new ByteArrayIOChannel(s);
+
+            v = io.recv();
+
+            if (v == null || v.length == 0) {
+                // no information => client initialization
+                clientInit(io);
+
+            } else {
+                // some data => client connect
+                conn = clientConnect(v, io, s);
+            }
+
+            if (conn == null)
+                s.close();
+
+            return conn;
+
+        } catch (IOException e) {
+            logger.error("exception during handshake", e);
+            try {
+                s.close();
+                return null;
+
+            } catch (IOException ee) {
+                throw new RuntimeException("failed to close socket after failed handshake",
+                                           ee);
+            }
+        }
+    }
+
+    private ClientConnection clientConnect(byte[] v, ByteArrayIOChannel io,
+                                           Socket sock) throws IOException {
+
+        List<String> reason = new ArrayList<String>(1);
+        ClientConnection conn = clientConnectCreate(v, io, sock, reason);
+
+        String message = null;
+        try {
+            JSONObject resp = new JSONObject();
+
+            resp.put("status", (conn != null ? "ok" : "failed"));
+
+            if (conn == null && !reason.isEmpty()) {
+                resp.put("reason", reason.get(0));
+            }
+
+            message = resp.toString();
+
+        } catch (JSONException e) {
+            logger.error("error creating response during connect.", e);
+            return null;
+        }
+
+        io.send(message.getBytes());
+
+        return conn;
+    }
+
+    private ClientConnection clientConnectCreate(byte[] v,
+                                                 ByteArrayIOChannel io,
+                                                 Socket sock,
+                                                 List<String> reason)
+            throws IOException {
+
+        try {
+            JSONObject cInfo = new JSONObject(new String(v));
+
+            String s = cInfo.optString("uuid", "");
+            if (s.isEmpty()) {
+                logger.error("missing client identifier during handshake.");
+                reason.add("missing UUID");
+                return null;
+            }
+
+            UUID u = UUID.fromString(s);
+
+            logger.info("connecting to client " + u);
+
+            s = cInfo.optString("readMode", "Private");
+            ClientReadMode rmode = ClientReadMode.fromString(s);
+            if (rmode == null) {
+                logger.error(u + ": unknown readMode " + s);
+                reason.add("unknown readMode " + s);
+                return null;
+
+            }
+
+            s = cInfo.optString("writeMode", "Enabled");
+            ClientWriteMode wmode = ClientWriteMode.fromString(s);
+            if (wmode == null) {
+                logger.error(u + ": unknown writeMode " + s);
+                reason.add("unknown writeMode " + s);
+                return null;
+            }
+
+            logger.info(u + " read=" + rmode + " write=" + wmode);
+
+            if (rmode == ClientReadMode.None
+                    && wmode == ClientWriteMode.Disabled) {
+                // client cannot disable read AND write...
+                logger.error("client neither reads nor writes.");
+                reason.add("read and write disabled");
+
+                return null;
+            }
+
+            ClientConnection conn = new ClientConnection(clientStub, sock, u, rmode, wmode);
+
+            if (rmode == ClientReadMode.Select) {
+                JSONArray incl = cInfo.optJSONArray("readInclude");
+                JSONArray excl = cInfo.optJSONArray("readExclude");
+
+                if (incl == null && excl == null) {
+                    logger.error(u + ": missing stream selection information");
+                    reason.add("missing readInclude and readExclude");
+                    return null;
+                }
+
+                if (incl != null) {
+                    List<String> streams = new ArrayList<String>(incl.length());
+                    for (int i = 0; i < incl.length(); ++i)
+                        streams.add(incl.getString(i));
+                    
+                    conn.includeStreams(streams);
+                }
+                
+                if (excl != null) {
+                    List<String> streams = new ArrayList<String>(excl.length());
+                    for (int i = 0; i < excl.length(); ++i)
+                        streams.add(excl.getString(i));
+                    
+                    conn.excludeStreams(streams);
+                }
+                
+            }
+
+            return conn;
+            
+        } catch (JSONException e) {
+            logger.error("malformed JSON from client during handshake", e);
+            reason.add("malformed JSON");
+
+        } catch (NumberFormatException e) {
+            logger.error("received malformed UUID", e);
+            reason.add("malformed UUID");
+
+        } catch (IllegalArgumentException e) {
+            logger.error("received malformed UUID", e);
+            reason.add("malformed UUID");
+        }
+
+        return null;
+    }
+
+    private void clientInit(ByteArrayIOChannel io) throws IOException {
+        String uuid = UUID.randomUUID().toString();
+        Info proto = clientStub.getProtocolInfo();
+
+        HashMap<String, Object> info = new HashMap<String, Object>();
+        info.put("uuid", uuid);
+        info.put("protocol", proto);
+
+        String response = gson.toJson(info) + "\n";
+
+        io.send(response.getBytes());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/IOChannel.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/IOChannel.java b/s4-core/src/main/java/org/apache/s4/client/IOChannel.java
new file mode 100644
index 0000000..e5c0414
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/IOChannel.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client;
+
+import java.io.IOException;
+
+public interface IOChannel {
+    byte[] recv() throws IOException;
+
+    void send(byte[] v) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/InputStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/InputStub.java b/s4-core/src/main/java/org/apache/s4/client/InputStub.java
new file mode 100644
index 0000000..63ca95f
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/InputStub.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client;
+
+import org.apache.s4.listener.EventProducer;
+
+public interface InputStub extends EventProducer {
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/OutputStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/OutputStub.java b/s4-core/src/main/java/org/apache/s4/client/OutputStub.java
new file mode 100644
index 0000000..59709d8
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/OutputStub.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client;
+
+import org.apache.s4.processor.AsynchronousEventProcessor;
+
+import java.util.List;
+
+public interface OutputStub extends AsynchronousEventProcessor {
+
+    List<String> getAcceptedStreams();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java b/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java
new file mode 100644
index 0000000..bfb6eeb
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/client/util/ObjectBuilder.java
@@ -0,0 +1,200 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client.util;
+
+import org.apache.s4.message.Request;
+import org.apache.s4.message.SinglePERequest;
+
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.InstanceCreator;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+import flexjson.JSONDeserializer;
+import flexjson.JSONSerializer;
+
+public class ObjectBuilder {
+
+    private ConcurrentHashMap<Class<?>, JSONDeserializer<Object>> deserializers = new ConcurrentHashMap<Class<?>, JSONDeserializer<Object>>();
+
+    private JSONSerializer serializer = (new JSONSerializer()).exclude("class");
+
+    // private JSONSerializer serializer = (new JSONSerializer());
+
+    public Object fromJson(String jevent, Class<?> clazz)
+            throws ObjectBuilder.Exception {
+
+        JSONDeserializer<Object> deser = deserializers.get(clazz);
+
+        if (deser == null) {
+            JSONDeserializer<Object> newDeser = new JSONDeserializer<Object>();
+            newDeser.use(null, clazz);
+
+            deser = deserializers.putIfAbsent(clazz, newDeser);
+
+            if (deser == null)
+                deser = newDeser;
+        }
+
+        return deser.deserialize(jevent);
+
+    }
+
+    public String toJson(Object e) {
+        return serializer.serialize(e);
+    }
+
+    public static class Exception extends java.lang.Exception {
+        public Exception(String message) {
+            super(message);
+        }
+
+        public Exception(String message, Throwable cause) {
+            super(message, cause);
+        }
+    }
+
+    private static class TEST {
+        private int a;
+        private int b;
+
+        public void setA(int a) {
+            this.a = a * 10;
+        }
+
+        public String toString() {
+            return "" + a + " " + b;
+        }
+
+        public int getA() {
+            return a;
+        }
+
+        public int getB() {
+            return b;
+        }
+
+        public TEST(int a, int b) {
+            this.a = a * 10;
+            this.b = b * 10;
+        }
+
+        public TEST() {
+        }
+    }
+
+    public static void main(String[] argv) throws Exception {
+
+        ObjectBuilder b = new ObjectBuilder();
+
+        String s = "{a:5, b:100}";
+        Object out = b.fromJson(s, TEST.class);
+
+        System.out.println(out.toString());
+
+        TEST t = new TEST(1, 2);
+
+        System.out.println(b.toJson(t));
+
+        String[] query = { "name", "count", "freq" };
+        String target[] = { "ACDW", "11" };
+
+        org.apache.s4.message.Request.ClientRInfo rinfo = new io.s4.message.Request.ClientRInfo();
+        rinfo.setRequesterUUID(UUID.randomUUID());
+        Request req = new org.apache.s4.message.SinglePERequest(Arrays.asList(target),
+                                                        Arrays.asList(query),
+                                                        rinfo);
+
+        System.out.println(req.toString());
+
+        InstanceCreator<org.apache.s4.message.Request.RInfo> infoCreator = new InstanceCreator<io.s4.message.Request.RInfo>() {
+            public org.apache.s4.message.Request.RInfo createInstance(Type type) {
+                return new org.apache.s4.message.Request.ClientRInfo();
+            }
+        };
+
+        Gson gson = (new GsonBuilder()).registerTypeAdapter(org.apache.s4.message.Request.RInfo.class,
+                                                            infoCreator)
+                                       .registerTypeAdapter(Object.class,
+                                                            new ObjectTypeAdapter())
+                                       .create();
+
+        System.out.println("gson: " + gson.toJson(req));
+        System.out.println("gson reversed: "
+                + gson.fromJson(gson.toJson(req), SinglePERequest.class));
+
+        System.out.println(b.toJson(req));
+        System.out.println(b.toJson(Arrays.asList(query)));
+
+        System.out.println("----------------------------------------------");
+
+        ArrayList<SSTest> list = new ArrayList<SSTest>();
+
+        SSTest ss1 = new SSTest();
+        ss1.str = "list-element-1";
+        SSTest ss2 = new SSTest();
+        ss2.str = "list-element-2";
+
+        list.add(ss1);
+        list.add(ss2);
+
+        Map<String, Object> listmap = new HashMap<String, Object>();
+        listmap.put("ll", list);
+
+        MapTest mt = new MapTest();
+        mt.map = listmap;
+
+        Object listmapobj = listmap;
+
+        System.out.println("list: " + gson.toJson(list));
+        System.out.println("listmap: " + gson.toJson(listmap));
+        System.out.println("listmapobj: " + gson.toJson(listmapobj));
+        System.out.println("mapobject: " + gson.toJson(mt));
+    }
+
+    private static class SSTest {
+        public String str;
+    }
+
+    private static class MapTest {
+        Map<String, Object> map;
+        Map gmap;
+    }
+
+    private static class ObjectTypeAdapter implements JsonSerializer<Object> {
+        public JsonElement serialize(Object src, Type typeOfSrc,
+                                     JsonSerializationContext context) {
+
+            if (src.getClass() != Object.class) {
+                return context.serialize(src, src.getClass());
+            }
+
+            return new JsonObject();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/collector/Event.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/collector/Event.java b/s4-core/src/main/java/org/apache/s4/collector/Event.java
new file mode 100644
index 0000000..404c724
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/collector/Event.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.collector;
+
+import org.apache.s4.dispatcher.Dispatcher;
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.dispatcher.partitioner.KeyInfo;
+import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElement;
+import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementIndex;
+import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementName;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class Event extends EventRecord {
+    private String eventName;
+    private long timestamp;
+    private List<CompoundKeyInfo> compoundKeys = new ArrayList<CompoundKeyInfo>();
+    private boolean debug = false;
+    public static final String EVENT_NAME_KEY = "S4__eventName";
+    public static final String TIMESTAMP_KEY = "S4__timestamp";
+
+    public void setDebug(boolean debug) {
+        this.debug = debug;
+    }
+
+    public Event(Map<String, Object> passedEventData) {
+        super(passedEventData);
+
+        eventName = this.get(EVENT_NAME_KEY, "unknown");
+        timestamp = this.get(TIMESTAMP_KEY, -1L);
+
+        List<EventRecord> plainCompoundKeyList = null;
+        if ((plainCompoundKeyList = get(Dispatcher.PARTITION_INFO_KEY,
+                                        EMPTY_LIST)) != EMPTY_LIST) {
+            for (EventRecord plainCompoundKeyInfo : plainCompoundKeyList) {
+                CompoundKeyInfo compoundKeyInfo = new CompoundKeyInfo();
+                compoundKeyInfo.setCompoundValue(plainCompoundKeyInfo.get("compoundValue",
+                                                                          (String) null));
+                compoundKeyInfo.setCompoundKey(plainCompoundKeyInfo.get("compoundKey",
+                                                                        (String) null));
+                compoundKeys.add(compoundKeyInfo);
+                for (EventRecord plainKeyInfo : plainCompoundKeyInfo.get("keyInfoList",
+                                                                         EMPTY_LIST)) {
+                    KeyInfo keyInfo = new KeyInfo();
+                    for (EventRecord plainKeyPathElement : plainKeyInfo.get("keyPathElementList",
+                                                                            EMPTY_LIST)) {
+                        String keyName = plainKeyPathElement.get("keyName",
+                                                                 (String) null);
+                        Integer index = plainKeyPathElement.get("index",
+                                                                (Integer) null);
+
+                        if (keyName != null) {
+                            keyInfo.addElementToPath(keyName);
+                        } else if (index != null) {
+                            keyInfo.addElementToPath(index);
+                        }
+                    }
+                    compoundKeyInfo.addKeyInfo(keyInfo);
+                }
+            }
+        }
+        if (debug) {
+            for (CompoundKeyInfo compoundKeyInfo : compoundKeys) {
+                System.out.println("CompoundKey: "
+                        + compoundKeyInfo.getCompoundValue());
+                for (KeyInfo keyInfo : compoundKeyInfo.getKeyInfoList()) {
+                    String keyPath = "";
+                    for (KeyPathElement keyPathElement : keyInfo.getKeyPath()) {
+                        if (keyPathElement instanceof KeyPathElementIndex) {
+                            keyPath += "["
+                                    + ((KeyPathElementIndex) keyPathElement).getIndex()
+                                    + "]";
+                        } else {
+                            if (keyPath.length() > 0) {
+                                keyPath += "/";
+                            }
+                            keyPath += ((KeyPathElementName) keyPathElement).getKeyName();
+                        }
+                    }
+                    System.out.println("   " + keyPath);
+                }
+            }
+        }
+    }
+
+    public List<CompoundKeyInfo> getCompoundKeys() {
+        return compoundKeys;
+    }
+
+    public String getEventName() {
+        return eventName;
+    }
+
+    public long getTimeStamp() {
+        return timestamp;
+    }
+
+    public List<Map<String, Object>> getCompoundKeyList() {
+        return get(Dispatcher.PARTITION_INFO_KEY,
+                   new ArrayList<Map<String, Object>>());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/collector/EventListener.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/collector/EventListener.java b/s4-core/src/main/java/org/apache/s4/collector/EventListener.java
new file mode 100644
index 0000000..721d600
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/collector/EventListener.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.collector;
+
+import static org.apache.s4.util.MetricsName.S4_CORE_METRICS;
+import static org.apache.s4.util.MetricsName.S4_EVENT_METRICS;
+import static org.apache.s4.util.MetricsName.generic_listener_msg_in_ct;
+import org.apache.s4.listener.EventHandler;
+import org.apache.s4.logger.Monitor;
+import org.apache.s4.processor.AsynchronousEventProcessor;
+import org.apache.s4.processor.PEContainer;
+
+import org.apache.log4j.Logger;
+
+public class EventListener implements EventHandler {
+    private static Logger logger = Logger.getLogger(EventListener.class);
+    private int eventCount = 0;
+    private AsynchronousEventProcessor eventProcessor;
+    private org.apache.s4.listener.EventListener rawListener;
+    private Monitor monitor;
+
+    public void setMonitor(Monitor monitor) {
+        this.monitor = monitor;
+    }
+
+    public void setPeContainer(PEContainer peContainer) {
+        this.eventProcessor = peContainer;
+    }
+
+    public void setEventProcessor(AsynchronousEventProcessor eventProcessor) {
+        this.eventProcessor = eventProcessor;
+    }
+
+    public void setRawListener(org.apache.s4.listener.EventListener rawListener) {
+        this.rawListener = rawListener;
+    }
+
+    public org.apache.s4.listener.EventListener getRawListener() {
+        return this.rawListener;
+    }
+
+    public int getEventCount() {
+        return eventCount;
+    }
+
+    public EventListener() {
+
+    }
+
+    public void init() {
+        rawListener.addHandler(this);
+    }
+
+    public void processEvent(EventWrapper eventWrapper) {
+        try {
+            synchronized (this) {
+                eventCount++;
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("STEP 3 (EventListener): peContainer.addEvent - "
+                        + eventWrapper.getEvent().toString());
+            }
+            eventProcessor.queueWork(eventWrapper);
+
+            if (monitor != null) {
+                monitor.increment(generic_listener_msg_in_ct.toString(),
+                                  1,
+                                  S4_EVENT_METRICS.toString(),
+                                  "et",
+                                  eventWrapper.getStreamName());
+                monitor.increment(generic_listener_msg_in_ct.toString(),
+                                  1,
+                                  S4_CORE_METRICS.toString());
+            }
+        } catch (Exception e) {
+            logger.error("Exception in processEvent on thread "
+                    + Thread.currentThread().getId(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/collector/EventRecord.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/collector/EventRecord.java b/s4-core/src/main/java/org/apache/s4/collector/EventRecord.java
new file mode 100644
index 0000000..8dbb4ea
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/collector/EventRecord.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.collector;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class EventRecord implements Map<String, Object> {
+
+    public static EventRecord EMPTY_RECORD = new EventRecord(new HashMap<String, Object>());
+    public static List<EventRecord> EMPTY_LIST = Collections.unmodifiableList(new ArrayList<EventRecord>());
+
+    private Map<String, Object> eventData = new HashMap<String, Object>();
+    private Map<String, Object> additionalData = new HashMap<String, Object>();
+
+    public EventRecord(Map<String, Object> passedEventData) {
+        this(passedEventData, true);
+    }
+
+    private EventRecord(Map<String, Object> passedEventData,
+            boolean processEventData) {
+        if (processEventData) {
+            eventData = processMap(passedEventData, true);
+        } else {
+            eventData = passedEventData;
+        }
+    }
+
+    private Map<String, Object> processMap(Map<String, Object> inputMap) {
+        return processMap(inputMap, false);
+    }
+
+    private Map<String, Object> processMap(Map<String, Object> inputMap, boolean returnRaw) {
+        Map<String, Object> eventData = new HashMap<String, Object>();
+        for (String key : inputMap.keySet()) {
+            Object value = inputMap.get(key);
+            if (value instanceof Map<?, ?>) {
+                eventData.put(key, processMap((Map<String, Object>) value));
+            } else if (value instanceof List<?>) {
+                eventData.put(key,
+                              processList((List<Map<String, Object>>) value));
+            } else {
+                eventData.put(key, value);
+            }
+        }
+        if (returnRaw)
+            return eventData;
+        return new EventRecord(eventData, false);
+    }
+
+    private List<Map<String, Object>> processList(List<Map<String, Object>> inputList) {
+        List<Map<String, Object>> eventList = new ArrayList<Map<String, Object>>();
+        for (Map<String, Object> inputMap : inputList) {
+            eventList.add(processMap(inputMap));
+        }
+        return Collections.unmodifiableList(eventList);
+    }
+
+    @Override
+    public void clear() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean containsKey(Object key) {
+        return eventData.containsKey(key);
+    }
+
+    @Override
+    public boolean containsValue(Object value) {
+        return eventData.containsValue(value);
+    }
+
+    @Override
+    public Set<java.util.Map.Entry<String, Object>> entrySet() {
+        return eventData.entrySet();
+    }
+
+    @Override
+    public Object get(Object key) {
+        return eventData.get(key);
+    }
+
+    public <T> T get(String key, T defaultValue) {
+        return get(key, defaultValue, eventData);
+    }
+
+    private <T> T get(String key, T defaultValue, Map<String, Object> map) {
+        Object value = map.get(key);
+        if (value == null) {
+            return defaultValue;
+        }
+        return (T) value;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return eventData.isEmpty();
+    }
+
+    @Override
+    public Set<String> keySet() {
+        return eventData.keySet();
+    }
+
+    @Override
+    public Object put(String key, Object value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void putAll(Map<? extends String, ? extends Object> m) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Object remove(Object key) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int size() {
+        return eventData.size();
+    }
+
+    @Override
+    public Collection<Object> values() {
+        return eventData.values();
+    }
+
+    public void setAdditionalProperty(String key, Object value) {
+        additionalData.put(key, value);
+    }
+
+    public <T> T getAdditionalProperty(String key, T defaultValue) {
+        return get(key, defaultValue, additionalData);
+    }
+
+    public void removeAdditionalProperty(String key) {
+        additionalData.remove(key);
+    }
+
+    public Map<String, Object> getMutableMap() {
+        return getMutableMap(this.eventData);
+    }
+
+    public Map<String, Object> getMutableMap(Map<String, Object> recordData) {
+        Map<String, Object> mutableData = new HashMap<String, Object>();
+        for (String key : recordData.keySet()) {
+            Object value = recordData.get(key);
+            if (value instanceof Map<?, ?>) {
+                mutableData.put(key, getMutableMap((Map<String, Object>) value));
+            } else if (value instanceof List<?>) {
+                mutableData.put(key,
+                                getMutableList((List<Map<String, Object>>) value));
+            } else {
+                mutableData.put(key, value);
+            }
+        }
+        return mutableData;
+    }
+
+    public List<Map<String, Object>> getMutableList(List<Map<String, Object>> recordList) {
+        List<Map<String, Object>> mutableList = new ArrayList<Map<String, Object>>();
+        for (Map<String, Object> recordData : recordList) {
+            mutableList.add(getMutableMap(recordData));
+        }
+        return mutableList;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/org/apache/s4/collector/EventWrapper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/collector/EventWrapper.java b/s4-core/src/main/java/org/apache/s4/collector/EventWrapper.java
new file mode 100644
index 0000000..cc5b2a6
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/collector/EventWrapper.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.collector;
+
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+
+public class EventWrapper {
+    private List<CompoundKeyInfo> compoundKeys = null;
+    private List<List<String>> compoundKeyNames = null;
+    private Object event;
+    private String streamName;
+
+    public List<CompoundKeyInfo> getCompoundKeys() {
+        return compoundKeys;
+    }
+
+    public Object getEvent() {
+        return event;
+    }
+
+    public String getStreamName() {
+        return streamName;
+    }
+
+    public List<List<String>> getCompoundKeyNames() {
+        return compoundKeyNames;
+    }
+
+    public EventWrapper() {
+        compoundKeys = new ArrayList<CompoundKeyInfo>();
+    }
+
+    public EventWrapper(String streamName, Object event,
+            List<CompoundKeyInfo> compoundKeys) {
+        this.streamName = streamName;
+        this.event = event;
+        this.compoundKeys = compoundKeys;
+    }
+
+    public EventWrapper(String streamName, String[] compoundKeyStrings,
+            Object event) {
+        this.streamName = streamName;
+        this.event = event;
+
+        if (compoundKeyStrings != null) {
+            this.compoundKeyNames = new ArrayList<List<String>>(compoundKeyStrings.length);
+
+            for (String keyAsString : compoundKeyStrings) {
+                List<String> keyNameElements = new ArrayList<String>();
+                StringTokenizer st = new StringTokenizer(keyAsString, "/");
+                while (st.hasMoreTokens()) {
+                    keyNameElements.add(st.nextToken());
+                }
+                compoundKeyNames.add(keyNameElements);
+            }
+        }
+    }
+
+    public String toString() {
+        return "stream:" + getStreamName() + " keys:" + getCompoundKeys()
+                + " keyNames:" + getCompoundKeyNames() + " event:" + getEvent();
+    }
+}