You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:16 UTC

[29/50] [abbrv] git commit: Rename packages in preparation for move to Apache

Rename packages in preparation for move to Apache


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/a7b4afb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/a7b4afb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/a7b4afb0

Branch: refs/heads/dev
Commit: a7b4afb095b47d0692913d0e157ecef2d2167a83
Parents: 64a901f
Author: Bruce Robbins <ro...@everychoose-lm.corp.yahoo.com>
Authored: Thu Nov 17 23:07:32 2011 -0800
Committer: Bruce Robbins <ro...@everychoose-lm.corp.yahoo.com>
Committed: Thu Nov 17 23:07:32 2011 -0800

----------------------------------------------------------------------
 s4-core/src/main/java/io/s4/MainApp.java           |  291 ------
 s4-core/src/main/java/io/s4/adapter/Adapter.java   |  208 ----
 s4-core/src/main/java/io/s4/client/Adapter.java    |  432 --------
 .../main/java/io/s4/client/ClientConnection.java   |  163 ---
 .../src/main/java/io/s4/client/ClientReadMode.java |   37 -
 s4-core/src/main/java/io/s4/client/ClientStub.java |  312 ------
 .../main/java/io/s4/client/ClientWriteMode.java    |   17 -
 .../java/io/s4/client/GenericJsonClientStub.java   |  107 --
 s4-core/src/main/java/io/s4/client/Handshake.java  |  223 -----
 s4-core/src/main/java/io/s4/client/IOChannel.java  |   24 -
 s4-core/src/main/java/io/s4/client/InputStub.java  |   21 -
 s4-core/src/main/java/io/s4/client/OutputStub.java |   26 -
 .../main/java/io/s4/client/util/ObjectBuilder.java |  200 ----
 s4-core/src/main/java/io/s4/collector/Event.java   |  117 ---
 .../main/java/io/s4/collector/EventListener.java   |   93 --
 .../src/main/java/io/s4/collector/EventRecord.java |  189 ----
 .../main/java/io/s4/collector/EventWrapper.java    |   80 --
 .../src/main/java/io/s4/dispatcher/Dispatcher.java |  227 -----
 .../java/io/s4/dispatcher/EventDispatcher.java     |   47 -
 .../java/io/s4/dispatcher/MultiDispatcher.java     |   49 -
 .../s4/dispatcher/StreamExcludingDispatcher.java   |   52 -
 .../s4/dispatcher/StreamSelectingDispatcher.java   |   52 -
 .../partitioner/BroadcastPartitioner.java          |   43 -
 .../s4/dispatcher/partitioner/CompoundKeyInfo.java |   77 --
 .../s4/dispatcher/partitioner/DefaultHasher.java   |   27 -
 .../dispatcher/partitioner/DefaultPartitioner.java |  351 -------
 .../dispatcher/partitioner/DummyPartitioner.java   |   33 -
 .../s4/dispatcher/partitioner/HashAlgorithm.java   |  160 ---
 .../java/io/s4/dispatcher/partitioner/Hasher.java  |   20 -
 .../java/io/s4/dispatcher/partitioner/KeyInfo.java |  125 ---
 .../partitioner/LoopbackPartitioner.java           |   69 --
 .../io/s4/dispatcher/partitioner/Partitioner.java  |   22 -
 .../partitioner/RoundRobinPartitioner.java         |   60 --
 .../partitioner/TestDefaultPartitioner.java        |  107 --
 .../partitioner/VariableKeyPartitioner.java        |   24 -
 .../io/s4/dispatcher/transformer/Transformer.java  |   21 -
 .../main/java/io/s4/emitter/CommLayerEmitter.java  |  261 -----
 .../src/main/java/io/s4/emitter/EventEmitter.java  |   24 -
 .../src/main/java/io/s4/ft/CheckpointingEvent.java |   53 -
 .../io/s4/ft/DefaultFileSystemStateStorage.java    |  242 -----
 .../java/io/s4/ft/InitiateCheckpointingEvent.java  |   35 -
 .../io/s4/ft/LoggingStorageCallbackFactory.java    |   54 -
 s4-core/src/main/java/io/s4/ft/RecoveryEvent.java  |   33 -
 .../src/main/java/io/s4/ft/RedisStateStorage.java  |  127 ---
 s4-core/src/main/java/io/s4/ft/SafeKeeper.java     |  257 -----
 s4-core/src/main/java/io/s4/ft/SafeKeeperId.java   |  119 ---
 s4-core/src/main/java/io/s4/ft/SaveStateTask.java  |   43 -
 s4-core/src/main/java/io/s4/ft/StateStorage.java   |   67 --
 .../src/main/java/io/s4/ft/StorageCallback.java    |   34 -
 .../main/java/io/s4/ft/StorageCallbackFactory.java |   33 -
 s4-core/src/main/java/io/s4/ft/package.html        |   23 -
 .../java/io/s4/listener/CommLayerListener.java     |  279 ------
 .../src/main/java/io/s4/listener/EventHandler.java |   23 -
 .../main/java/io/s4/listener/EventListener.java    |   23 -
 .../main/java/io/s4/listener/EventProducer.java    |   23 -
 .../src/main/java/io/s4/logger/Log4jMonitor.java   |  118 ---
 s4-core/src/main/java/io/s4/logger/Monitor.java    |   30 -
 .../src/main/java/io/s4/logger/TraceMessage.java   |   40 -
 .../main/java/io/s4/message/PrototypeRequest.java  |  111 --
 s4-core/src/main/java/io/s4/message/Request.java   |  181 ----
 s4-core/src/main/java/io/s4/message/Response.java  |   92 --
 .../main/java/io/s4/message/SinglePERequest.java   |  137 ---
 .../main/java/io/s4/persist/ConMapPersister.java   |  192 ----
 .../main/java/io/s4/persist/DumpingPersister.java  |  162 ---
 .../main/java/io/s4/persist/HashMapPersister.java  |  207 ----
 s4-core/src/main/java/io/s4/persist/Persister.java |  186 ----
 .../src/main/java/io/s4/processor/AbstractPE.java  |  778 ---------------
 .../java/io/s4/processor/AbstractWindowingPE.java  |  196 ----
 .../s4/processor/AsynchronousEventProcessor.java   |   28 -
 .../io/s4/processor/ControlEventProcessor.java     |   79 --
 .../src/main/java/io/s4/processor/EventAdvice.java |   38 -
 s4-core/src/main/java/io/s4/processor/JoinPE.java  |  194 ----
 .../main/java/io/s4/processor/OutputFormatter.java |   20 -
 .../java/io/s4/processor/OverloadDispatcher.java   |   20 -
 .../s4/processor/OverloadDispatcherGenerator.java  |  313 ------
 .../io/s4/processor/OverloadDispatcherSlot.java    |   20 -
 .../src/main/java/io/s4/processor/PEContainer.java |  431 --------
 .../main/java/io/s4/processor/PrintEventPE.java    |   31 -
 .../java/io/s4/processor/PrototypeWrapper.java     |  129 ---
 .../src/main/java/io/s4/processor/ReroutePE.java   |  104 --
 .../java/io/s4/processor/SimpleCountingPE.java     |   92 --
 s4-core/src/main/java/io/s4/schema/Schema.java     |  293 ------
 .../main/java/io/s4/schema/SchemaContainer.java    |   37 -
 .../src/main/java/io/s4/schema/SchemaManager.java  |   92 --
 .../main/java/io/s4/serialize/KryoSerDeser.java    |   75 --
 .../io/s4/serialize/SerializerDeserializer.java    |   22 -
 .../java/io/s4/test/TestPersisterEventClock.java   |  152 ---
 .../java/io/s4/test/TestPersisterWallClock.java    |  122 ---
 .../main/java/io/s4/util/ByteArrayIOChannel.java   |   94 --
 s4-core/src/main/java/io/s4/util/Cloner.java       |   20 -
 .../src/main/java/io/s4/util/ClonerGenerator.java  |  149 ---
 .../java/io/s4/util/DoubleOutputFormatter.java     |   44 -
 s4-core/src/main/java/io/s4/util/GsonUtil.java     |   74 --
 s4-core/src/main/java/io/s4/util/KeyUtil.java      |   59 --
 .../src/main/java/io/s4/util/LoadGenerator.java    |  640 ------------
 .../src/main/java/io/s4/util/MethodInvoker.java    |  124 ---
 s4-core/src/main/java/io/s4/util/MetricsName.java  |   51 -
 .../src/main/java/io/s4/util/MiscConstants.java    |   20 -
 s4-core/src/main/java/io/s4/util/NumberUtils.java  |   50 -
 .../src/main/java/io/s4/util/PreprodLogger.java    |   59 --
 .../io/s4/util/ReverseDoubleOutputFormatter.java   |   28 -
 .../io/s4/util/ReverseIntegerOutputFormatter.java  |   28 -
 s4-core/src/main/java/io/s4/util/S4Util.java       |   25 -
 s4-core/src/main/java/io/s4/util/SlotUtils.java    |   60 --
 .../java/io/s4/util/ToStringOutputFormatter.java   |   26 -
 s4-core/src/main/java/io/s4/util/Watcher.java      |  174 ----
 s4-core/src/main/java/io/s4/util/clock/Clock.java  |   25 -
 .../java/io/s4/util/clock/ClockStreamsLoader.java  |   47 -
 .../main/java/io/s4/util/clock/DrivenClock.java    |   87 --
 .../src/main/java/io/s4/util/clock/EventClock.java |   74 --
 .../main/java/io/s4/util/clock/TimerRequest.java   |   46 -
 .../src/main/java/io/s4/util/clock/WallClock.java  |   41 -
 s4-core/src/main/java/org/apache/s4/MainApp.java   |  291 ++++++
 .../main/java/org/apache/s4/adapter/Adapter.java   |  208 ++++
 .../main/java/org/apache/s4/client/Adapter.java    |  432 ++++++++
 .../org/apache/s4/client/ClientConnection.java     |  163 +++
 .../java/org/apache/s4/client/ClientReadMode.java  |   37 +
 .../main/java/org/apache/s4/client/ClientStub.java |  312 ++++++
 .../java/org/apache/s4/client/ClientWriteMode.java |   17 +
 .../apache/s4/client/GenericJsonClientStub.java    |  107 ++
 .../main/java/org/apache/s4/client/Handshake.java  |  223 +++++
 .../main/java/org/apache/s4/client/IOChannel.java  |   24 +
 .../main/java/org/apache/s4/client/InputStub.java  |   21 +
 .../main/java/org/apache/s4/client/OutputStub.java |   26 +
 .../org/apache/s4/client/util/ObjectBuilder.java   |  200 ++++
 .../main/java/org/apache/s4/collector/Event.java   |  117 +++
 .../org/apache/s4/collector/EventListener.java     |   93 ++
 .../java/org/apache/s4/collector/EventRecord.java  |  189 ++++
 .../java/org/apache/s4/collector/EventWrapper.java |   80 ++
 .../java/org/apache/s4/dispatcher/Dispatcher.java  |  227 +++++
 .../org/apache/s4/dispatcher/EventDispatcher.java  |   47 +
 .../org/apache/s4/dispatcher/MultiDispatcher.java  |   49 +
 .../s4/dispatcher/StreamExcludingDispatcher.java   |   52 +
 .../s4/dispatcher/StreamSelectingDispatcher.java   |   52 +
 .../partitioner/BroadcastPartitioner.java          |   43 +
 .../s4/dispatcher/partitioner/CompoundKeyInfo.java |   77 ++
 .../s4/dispatcher/partitioner/DefaultHasher.java   |   27 +
 .../dispatcher/partitioner/DefaultPartitioner.java |  351 +++++++
 .../dispatcher/partitioner/DummyPartitioner.java   |   33 +
 .../s4/dispatcher/partitioner/HashAlgorithm.java   |  160 +++
 .../apache/s4/dispatcher/partitioner/Hasher.java   |   20 +
 .../apache/s4/dispatcher/partitioner/KeyInfo.java  |  125 +++
 .../partitioner/LoopbackPartitioner.java           |   69 ++
 .../s4/dispatcher/partitioner/Partitioner.java     |   22 +
 .../partitioner/RoundRobinPartitioner.java         |   60 ++
 .../partitioner/TestDefaultPartitioner.java        |  107 ++
 .../partitioner/VariableKeyPartitioner.java        |   24 +
 .../s4/dispatcher/transformer/Transformer.java     |   21 +
 .../org/apache/s4/emitter/CommLayerEmitter.java    |  261 +++++
 .../java/org/apache/s4/emitter/EventEmitter.java   |   24 +
 .../java/org/apache/s4/ft/CheckpointingEvent.java  |   53 +
 .../s4/ft/DefaultFileSystemStateStorage.java       |  242 +++++
 .../apache/s4/ft/InitiateCheckpointingEvent.java   |   35 +
 .../s4/ft/LoggingStorageCallbackFactory.java       |   54 +
 .../main/java/org/apache/s4/ft/RecoveryEvent.java  |   33 +
 .../java/org/apache/s4/ft/RedisStateStorage.java   |  127 +++
 .../src/main/java/org/apache/s4/ft/SafeKeeper.java |  257 +++++
 .../main/java/org/apache/s4/ft/SafeKeeperId.java   |  119 +++
 .../main/java/org/apache/s4/ft/SaveStateTask.java  |   43 +
 .../main/java/org/apache/s4/ft/StateStorage.java   |   67 ++
 .../java/org/apache/s4/ft/StorageCallback.java     |   34 +
 .../org/apache/s4/ft/StorageCallbackFactory.java   |   33 +
 .../src/main/java/org/apache/s4/ft/package.html    |   23 +
 .../org/apache/s4/listener/CommLayerListener.java  |  279 ++++++
 .../java/org/apache/s4/listener/EventHandler.java  |   23 +
 .../java/org/apache/s4/listener/EventListener.java |   23 +
 .../java/org/apache/s4/listener/EventProducer.java |   23 +
 .../java/org/apache/s4/logger/Log4jMonitor.java    |  118 +++
 .../main/java/org/apache/s4/logger/Monitor.java    |   30 +
 .../java/org/apache/s4/logger/TraceMessage.java    |   40 +
 .../org/apache/s4/message/PrototypeRequest.java    |  111 ++
 .../main/java/org/apache/s4/message/Request.java   |  181 ++++
 .../main/java/org/apache/s4/message/Response.java  |   92 ++
 .../org/apache/s4/message/SinglePERequest.java     |  137 +++
 .../org/apache/s4/persist/ConMapPersister.java     |  192 ++++
 .../org/apache/s4/persist/DumpingPersister.java    |  162 +++
 .../org/apache/s4/persist/HashMapPersister.java    |  207 ++++
 .../main/java/org/apache/s4/persist/Persister.java |  186 ++++
 .../java/org/apache/s4/processor/AbstractPE.java   |  778 +++++++++++++++
 .../apache/s4/processor/AbstractWindowingPE.java   |  196 ++++
 .../s4/processor/AsynchronousEventProcessor.java   |   28 +
 .../apache/s4/processor/ControlEventProcessor.java |   79 ++
 .../java/org/apache/s4/processor/EventAdvice.java  |   38 +
 .../main/java/org/apache/s4/processor/JoinPE.java  |  194 ++++
 .../org/apache/s4/processor/OutputFormatter.java   |   20 +
 .../apache/s4/processor/OverloadDispatcher.java    |   20 +
 .../s4/processor/OverloadDispatcherGenerator.java  |  313 ++++++
 .../s4/processor/OverloadDispatcherSlot.java       |   20 +
 .../java/org/apache/s4/processor/PEContainer.java  |  431 ++++++++
 .../java/org/apache/s4/processor/PrintEventPE.java |   31 +
 .../org/apache/s4/processor/PrototypeWrapper.java  |  129 +++
 .../java/org/apache/s4/processor/ReroutePE.java    |  104 ++
 .../org/apache/s4/processor/SimpleCountingPE.java  |   92 ++
 .../src/main/java/org/apache/s4/schema/Schema.java |  293 ++++++
 .../java/org/apache/s4/schema/SchemaContainer.java |   37 +
 .../java/org/apache/s4/schema/SchemaManager.java   |   92 ++
 .../java/org/apache/s4/serialize/KryoSerDeser.java |   75 ++
 .../s4/serialize/SerializerDeserializer.java       |   22 +
 .../apache/s4/test/TestPersisterEventClock.java    |  152 +++
 .../org/apache/s4/test/TestPersisterWallClock.java |  122 +++
 .../org/apache/s4/util/ByteArrayIOChannel.java     |   94 ++
 .../src/main/java/org/apache/s4/util/Cloner.java   |   20 +
 .../java/org/apache/s4/util/ClonerGenerator.java   |  149 +++
 .../org/apache/s4/util/DoubleOutputFormatter.java  |   44 +
 .../src/main/java/org/apache/s4/util/GsonUtil.java |   74 ++
 .../src/main/java/org/apache/s4/util/KeyUtil.java  |   59 ++
 .../java/org/apache/s4/util/LoadGenerator.java     |  640 ++++++++++++
 .../java/org/apache/s4/util/MethodInvoker.java     |  124 +++
 .../main/java/org/apache/s4/util/MetricsName.java  |   51 +
 .../java/org/apache/s4/util/MiscConstants.java     |   20 +
 .../main/java/org/apache/s4/util/NumberUtils.java  |   50 +
 .../java/org/apache/s4/util/PreprodLogger.java     |   59 ++
 .../s4/util/ReverseDoubleOutputFormatter.java      |   28 +
 .../s4/util/ReverseIntegerOutputFormatter.java     |   28 +
 .../src/main/java/org/apache/s4/util/S4Util.java   |   25 +
 .../main/java/org/apache/s4/util/SlotUtils.java    |   60 ++
 .../apache/s4/util/ToStringOutputFormatter.java    |   26 +
 .../src/main/java/org/apache/s4/util/Watcher.java  |  174 ++++
 .../main/java/org/apache/s4/util/clock/Clock.java  |   25 +
 .../apache/s4/util/clock/ClockStreamsLoader.java   |   47 +
 .../java/org/apache/s4/util/clock/DrivenClock.java |   87 ++
 .../java/org/apache/s4/util/clock/EventClock.java  |   74 ++
 .../org/apache/s4/util/clock/TimerRequest.java     |   46 +
 .../java/org/apache/s4/util/clock/WallClock.java   |   41 +
 224 files changed, 12626 insertions(+), 12626 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/MainApp.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/MainApp.java b/s4-core/src/main/java/io/s4/MainApp.java
deleted file mode 100644
index 78c195b..0000000
--- a/s4-core/src/main/java/io/s4/MainApp.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4;
-
-import io.s4.ft.SafeKeeper;
-import io.s4.processor.AbstractPE;
-import io.s4.processor.PEContainer;
-import io.s4.util.S4Util;
-import io.s4.util.Watcher;
-import io.s4.util.clock.Clock;
-import io.s4.util.clock.EventClock;
-
-import java.io.File;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.FileSystemXmlApplicationContext;
-import org.springframework.core.io.ClassPathResource;
-
-
-public class MainApp {
-
-    private static String coreHome = "../s4-core";
-    private static String appsHome = "../s4-apps";
-    private static String extsHome = "../s4-exts";
-
-    public static void main(String args[]) throws Exception {
-        Options options = new Options();
-
-        options.addOption(OptionBuilder.withArgName("corehome")
-                                       .hasArg()
-                                       .withDescription("core home")
-                                       .create("c"));
-
-        options.addOption(OptionBuilder.withArgName("appshome")
-                                       .hasArg()
-                                       .withDescription("applications home")
-                                       .create("a"));
-
-        options.addOption(OptionBuilder.withArgName("s4clock")
-                                       .hasArg()
-                                       .withDescription("s4 clock")
-                                       .create("d"));
-
-        options.addOption(OptionBuilder.withArgName("seedtime")
-                                       .hasArg()
-                                       .withDescription("event clock initialization time")
-                                       .create("s"));        
-        
-        options.addOption(OptionBuilder.withArgName("extshome")
-                                       .hasArg()
-                                       .withDescription("extensions home")
-                                       .create("e"));
-
-        options.addOption(OptionBuilder.withArgName("instanceid")
-                                       .hasArg()
-                                       .withDescription("instance id")
-                                       .create("i"));
-
-        options.addOption(OptionBuilder.withArgName("configtype")
-                                       .hasArg()
-                                       .withDescription("configuration type")
-                                       .create("t"));
-
-        CommandLineParser parser = new GnuParser();
-        CommandLine commandLine = null;
-        String clockType = "wall";
-
-        try {
-            commandLine = parser.parse(options, args);
-        } catch (ParseException pe) {
-            System.err.println(pe.getLocalizedMessage());
-            System.exit(1);
-        }
-
-        int instanceId = -1;
-        if (commandLine.hasOption("i")) {
-            String instanceIdStr = commandLine.getOptionValue("i");
-            try {
-                instanceId = Integer.parseInt(instanceIdStr);
-            } catch (NumberFormatException nfe) {
-                System.err.println("Bad instance id: %s" + instanceIdStr);
-                System.exit(1);
-            }
-        }
-
-        if (commandLine.hasOption("c")) {
-            coreHome = commandLine.getOptionValue("c");
-        }
-
-        if (commandLine.hasOption("a")) {
-            appsHome = commandLine.getOptionValue("a");
-        }
-        
-        if (commandLine.hasOption("d")) {
-            clockType = commandLine.getOptionValue("d");
-        }
-
-        if (commandLine.hasOption("e")) {
-            extsHome = commandLine.getOptionValue("e");
-        }
-
-        String configType = "typical";
-        if (commandLine.hasOption("t")) {
-            configType = commandLine.getOptionValue("t");
-        }
-        
-        long seedTime = 0;
-        if (commandLine.hasOption("s")) {
-            seedTime = Long.parseLong(commandLine.getOptionValue("s"));
-        }
-
-        File coreHomeFile = new File(coreHome);
-        if (!coreHomeFile.isDirectory()) {
-            System.err.println("Bad core home: " + coreHome);
-            System.exit(1);
-        }
-
-        File appsHomeFile = new File(appsHome);
-        if (!appsHomeFile.isDirectory()) {
-            System.err.println("Bad applications home: " + appsHome);
-            System.exit(1);
-        }
-
-        if (instanceId > -1) {
-            System.setProperty("instanceId", "" + instanceId);
-        } else {
-            System.setProperty("instanceId", "" + S4Util.getPID());
-        }
-
-        List loArgs = commandLine.getArgList();
-
-        if (loArgs.size() < 1) {
-            // System.err.println("No bean configuration file specified");
-            // System.exit(1);
-        }
-
-        // String s4ConfigXml = (String) loArgs.get(0);
-        // System.out.println("s4ConfigXml is " + s4ConfigXml);
-
-        ClassPathResource propResource = new ClassPathResource("s4-core.properties");
-        Properties prop = new Properties();
-        if (propResource.exists()) {
-            prop.load(propResource.getInputStream());
-        } else {
-            System.err.println("Unable to find s4-core.properties. It must be available in classpath");
-            System.exit(1);
-        }
-
-        ApplicationContext coreContext = null;
-        String configBase = coreHome + File.separatorChar + "conf"
-                + File.separatorChar + configType;
-        String configPath = "";
-        List<String> coreConfigUrls = new ArrayList<String>(); 
-        File configFile = null;
-
-        // load clock configuration
-        configPath = configBase + File.separatorChar + clockType + "-clock.xml";            
-        coreConfigUrls.add(configPath);
-
-        // load core config xml
-        configPath = configBase + File.separatorChar + "s4-core-conf.xml";
-        configFile = new File(configPath);
-        if (!configFile.exists()) {
-            System.err.printf("S4 core config file %s does not exist\n",
-                    configPath);
-            System.exit(1);
-        }
-		
-        coreConfigUrls.add(configPath);
-        String[] coreConfigFiles = new String[coreConfigUrls.size()];
-        coreConfigUrls.toArray(coreConfigFiles);
-
-        String[] coreConfigFileUrls = new String[coreConfigFiles.length];
-        for (int i = 0; i < coreConfigFiles.length; i++) {
-            coreConfigFileUrls[i] = "file:" + coreConfigFiles[i];
-        }
-
-        coreContext = new FileSystemXmlApplicationContext(coreConfigFileUrls, coreContext);
-        ApplicationContext context = coreContext;        
-        
-        Clock clock = (Clock) context.getBean("clock");
-        if (clock instanceof EventClock && seedTime > 0) {
-            EventClock s4EventClock = (EventClock)clock;
-            s4EventClock.updateTime(seedTime);
-            System.out.println("Intializing event clock time with seed time " + s4EventClock.getCurrentTime());
-        }
-        
-        PEContainer peContainer = (PEContainer) context.getBean("peContainer");
-
-        Watcher w = (Watcher) context.getBean("watcher");
-        w.setConfigFilename(configPath);
-
-        
-        // load extension modules
-        String[] configFileNames = getModuleConfigFiles(extsHome, prop);
-        if (configFileNames.length > 0) {
-            String[] configFileUrls = new String[configFileNames.length];
-            for (int i = 0; i < configFileNames.length; i++) {
-                configFileUrls[i] = "file:" + configFileNames[i];
-            }
-            context = new FileSystemXmlApplicationContext(configFileUrls,
-                                                          context);
-        }
-
-        // load application modules
-        configFileNames = getModuleConfigFiles(appsHome, prop);
-        if (configFileNames.length > 0) {
-            String[] configFileUrls = new String[configFileNames.length];
-            for (int i = 0; i < configFileNames.length; i++) {
-                configFileUrls[i] = "file:" + configFileNames[i];
-            }
-            context = new FileSystemXmlApplicationContext(configFileUrls,
-                                                          context);
-            // attach any beans that implement ProcessingElement to the PE
-            // Container
-            String[] processingElementBeanNames = context.getBeanNamesForType(AbstractPE.class);
-            for (String processingElementBeanName : processingElementBeanNames) {
-                AbstractPE bean = (AbstractPE) context.getBean(processingElementBeanName);
-                bean.setClock(clock);
-                try {
-                    bean.setSafeKeeper((SafeKeeper) context.getBean("safeKeeper"));
-                } catch (NoSuchBeanDefinitionException ignored) {
-                    // no safe keeper = no checkpointing / recovery
-                }
-                // if the application did not specify an id, use the Spring bean name
-                if (bean.getId() == null) {
-                    bean.setId(processingElementBeanName);
-                }
-                System.out.println("Adding processing element with bean name "
-                        + processingElementBeanName + ", id "
-                        + ((AbstractPE) bean).getId());
-                peContainer.addProcessor((AbstractPE) bean);
-            }
-        }  
-    }
-
-    /**
-     * 
-     * @param prop
-     * @return
-     */
-    private static String[] getModuleConfigFiles(String moduleBase, Properties prop) {
-        List<String> configFileList = new ArrayList<String>();
-        File moduleBaseFile = new File(moduleBase);
-
-        // list applications
-        File[] moduleDirs = moduleBaseFile.listFiles();
-        for (File moduleDir : moduleDirs) {
-            if (moduleDir.isDirectory()) {
-                String confFileName = moduleDir.getAbsolutePath() + "/"
-                        + moduleDir.getName() + "-conf.xml";
-                File appsConfFile = new File(confFileName);
-                if (appsConfFile.exists()) {
-                    configFileList.add(appsConfFile.getAbsolutePath());
-                } else {
-                    System.err.println("Invalid application: " + moduleDir);
-                }
-            }
-        }
-        String[] ret = new String[configFileList.size()];
-        configFileList.toArray(ret);
-        System.out.println(Arrays.toString(ret));
-        return ret;
-    }    
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/adapter/Adapter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/adapter/Adapter.java b/s4-core/src/main/java/io/s4/adapter/Adapter.java
deleted file mode 100644
index c31ee97..0000000
--- a/s4-core/src/main/java/io/s4/adapter/Adapter.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.adapter;
-
-import io.s4.collector.EventWrapper;
-import io.s4.dispatcher.EventDispatcher;
-import io.s4.listener.EventHandler;
-import io.s4.listener.EventListener;
-import io.s4.listener.EventProducer;
-import io.s4.util.S4Util;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.log4j.Logger;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.FileSystemXmlApplicationContext;
-
-public class Adapter implements EventHandler {
-    private static String coreHome = "../s4_core";
-
-    private EventDispatcher dispatcher;
-    private EventProducer[] eventListeners;
-    private String configFilename;
-
-    public void setDispatcher(EventDispatcher dispatcher) {
-        this.dispatcher = dispatcher;
-    }
-
-    public void setEventListeners(EventProducer[] eventListeners) {
-        this.eventListeners = eventListeners;
-        for (EventProducer eventListener : eventListeners) {
-            eventListener.addHandler(this);
-        }
-    }
-
-    public void setConfigFilename(String configFilename) {
-        this.configFilename = configFilename;
-    }
-
-    private volatile int eventCount = 0;
-    private volatile int rawEventCount = 0;
-
-    public Adapter() {
-
-    }
-
-    int counts[];
-
-    private boolean init = false;
-
-    public void init() {
-        synchronized (this) {
-            init = true;
-        }
-    }
-
-    public void processEvent(EventWrapper eventWrapper) {
-        try {
-            synchronized (this) {
-                if (!init) {
-                    return;
-                }
-                rawEventCount++;
-                eventCount++;
-            }
-            dispatcher.dispatchEvent(eventWrapper.getStreamName(),
-                                     eventWrapper.getEvent());
-        } catch (Exception e) {
-            Logger.getLogger("dispatcher").info("Exception adapting event", e);
-        }
-    }
-
-    public static void main(String args[]) {
-        Options options = new Options();
-
-        options.addOption(OptionBuilder.withArgName("corehome")
-                                       .hasArg()
-                                       .withDescription("core home")
-                                       .create("c"));
-
-        options.addOption(OptionBuilder.withArgName("instanceid")
-                                       .hasArg()
-                                       .withDescription("instance id")
-                                       .create("i"));
-
-        options.addOption(OptionBuilder.withArgName("configtype")
-                                       .hasArg()
-                                       .withDescription("configuration type")
-                                       .create("t"));
-
-        options.addOption(OptionBuilder.withArgName("userconfig")
-                                       .hasArg()
-                                       .withDescription("user-defined legacy data adapter configuration file")
-                                       .create("d"));
-
-        CommandLineParser parser = new GnuParser();
-        CommandLine commandLine = null;
-
-        try {
-            commandLine = parser.parse(options, args);
-        } catch (ParseException pe) {
-            System.err.println(pe.getLocalizedMessage());
-            System.exit(1);
-        }
-
-        int instanceId = -1;
-        if (commandLine.hasOption("i")) {
-            String instanceIdStr = commandLine.getOptionValue("i");
-            try {
-                instanceId = Integer.parseInt(instanceIdStr);
-            } catch (NumberFormatException nfe) {
-                System.err.println("Bad instance id: %s" + instanceIdStr);
-                System.exit(1);
-            }
-        }
-
-        if (commandLine.hasOption("c")) {
-            coreHome = commandLine.getOptionValue("c");
-        }
-
-        String configType = "typical";
-        if (commandLine.hasOption("t")) {
-            configType = commandLine.getOptionValue("t");
-        }
-
-        String userConfigFilename = null;
-        if (commandLine.hasOption("d")) {
-            userConfigFilename = commandLine.getOptionValue("d");
-        }
-
-        File userConfigFile = new File(userConfigFilename);
-        if (!userConfigFile.isFile()) {
-            System.err.println("Bad user configuration file: "
-                    + userConfigFilename);
-            System.exit(1);
-        }
-
-        File coreHomeFile = new File(coreHome);
-        if (!coreHomeFile.isDirectory()) {
-            System.err.println("Bad core home: " + coreHome);
-            System.exit(1);
-        }
-
-        if (instanceId > -1) {
-            System.setProperty("instanceId", "" + instanceId);
-        } else {
-            System.setProperty("instanceId", "" + S4Util.getPID());
-        }
-
-        String configBase = coreHome + File.separatorChar + "conf"
-                + File.separatorChar + configType;
-        String configPath = configBase + File.separatorChar
-                + "adapter-conf.xml";
-        File configFile = new File(configPath);
-        if (!configFile.exists()) {
-            System.err.printf("adapter config file %s does not exist\n",
-                              configPath);
-            System.exit(1);
-        }
-
-        // load adapter config xml
-        ApplicationContext coreContext;
-        coreContext = new FileSystemXmlApplicationContext("file:" + configPath);
-        ApplicationContext context = coreContext;
-
-        Adapter adapter = (Adapter) context.getBean("adapter");
-
-        ApplicationContext appContext = new FileSystemXmlApplicationContext(new String[] { "file:"
-                                                                                    + userConfigFilename },
-                                                                            context);
-        Map listenerBeanMap = appContext.getBeansOfType(EventProducer.class);
-        if (listenerBeanMap.size() == 0) {
-            System.err.println("No user-defined listener beans");
-            System.exit(1);
-        }
-        EventProducer[] eventListeners = new EventProducer[listenerBeanMap.size()];
-
-        int index = 0;
-        for (Iterator it = listenerBeanMap.keySet().iterator(); it.hasNext(); index++) {
-            String beanName = (String) it.next();
-            System.out.println("Adding producer " + beanName);
-            eventListeners[index] = (EventProducer) listenerBeanMap.get(beanName);
-        }
-
-        adapter.setEventListeners(eventListeners);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/Adapter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/Adapter.java b/s4-core/src/main/java/io/s4/client/Adapter.java
deleted file mode 100644
index fb9c770..0000000
--- a/s4-core/src/main/java/io/s4/client/Adapter.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client;
-
-import io.s4.collector.EventListener;
-import io.s4.collector.EventWrapper;
-import io.s4.dispatcher.EventDispatcher;
-import io.s4.listener.EventHandler;
-import io.s4.message.Request;
-import io.s4.processor.AsynchronousEventProcessor;
-import io.s4.util.S4Util;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.Logger;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.FileSystemXmlApplicationContext;
-
-/**
- * Adapter to connect an S4 cluster to an external client to read from and write
- * into the S4 cluster.
- */
-public class Adapter {
-    private static String coreHome = "../s4_core";
-
-    // Accept events from clients and send into S4 cluster.
-    private EventDispatcher dispatcher;
-    private Writer eventWriter = new Writer();
-
-    // Listen to events from S4 cluster and send to clients.
-    private io.s4.collector.EventListener clusterEventListener;
-    private Reader eventReader = new Reader();
-
-    /**
-     * Set the dispatcher to use for sending events into S4.
-     * 
-     * @param dispatcher
-     */
-    public void setDispatcher(EventDispatcher dispatcher) {
-        this.dispatcher = dispatcher;
-    }
-
-    /**
-     * clusterEventListener receives events from S4 cluster. Processing of
-     * events is delegated to the eventReader, which typically forwards them to
-     * appropriate clients.
-     * 
-     * @param clusterEventListener
-     */
-    public void setClusterEventListener(EventListener clusterEventListener) {
-        this.clusterEventListener = clusterEventListener;
-        this.clusterEventListener.setEventProcessor(eventReader);
-    }
-
-    public Adapter() {
-    }
-
-    int counts[];
-
-    private boolean init = false;
-
-    public void init() {
-        synchronized (this) {
-            init = true;
-        }
-    }
-
-    /**
-     * Register a list of InputStubs that will send events into the S4 cluster.
-     * These events will be processed by the eventWriter: i.e. the eventWriter
-     * dispatches them into the S4 cluster.
-     * 
-     * @param stubs
-     */
-    public void setInputStubs(List<InputStub> stubs) {
-        for (InputStub stub : stubs) {
-            // register the writer as the handler for the stub's events
-            stub.addHandler(eventWriter);
-        }
-    }
-
-    // collections of eventReceivers (OutputStubs) to which events will be sent
-    // for forwarding to clients.
-    HashMap<String, List<OutputStub>> eventReceivers = new HashMap<String, List<OutputStub>>();
-    List<OutputStub> eventReceiversAny = new ArrayList<OutputStub>();
-    List<OutputStub> eventReceiversAll = new ArrayList<OutputStub>();
-
-    /**
-     * Register a list of OutputStubs which process events received from the S4
-     * cluster, typically forwarding them to clients.
-     * 
-     * The {@link OutputStub#getAcceptedStreams()} is called to determine which
-     * streams the stub is interested in receiving. Accordingly, three
-     * collections of stubs are created.
-     * 
-     * <ol>
-     * <li>A mapping from stream name to OutputStubs (One-to-many). Used for
-     * routing.</li>
-     * <li>A list of OutputStubs that accept all streams (
-     * {@code stub.getAcceptedStreams() == null}. Used for routing.</li>
-     * <li>A list of all OutputStubs. This is used to iterate over all the stubs
-     * exactly once.</li>
-     * </ol>
-     * 
-     * @param stubs
-     *            the list of output stubs.
-     */
-    public void setOutputStubs(List<OutputStub> stubs) {
-        eventReceiversAll.addAll(stubs);
-
-        for (OutputStub stub : stubs) {
-            // update mapping of stream names to stubs that accept events on
-            // that stream.
-            List<String> streams = stub.getAcceptedStreams();
-            if (streams != null) {
-                for (String stream : streams) {
-                    List<OutputStub> stubList = eventReceivers.get(stream);
-                    if (stubList == null) {
-                        stubList = new ArrayList<OutputStub>();
-                        eventReceivers.put(stream, stubList);
-                    }
-
-                    stubList.add(stub);
-                }
-            } else {
-                eventReceiversAny.add(stub);
-            }
-        }
-    }
-
-    /**
-     * Write events from input stubs into S4 cluster.
-     */
-    private class Writer implements EventHandler {
-        // events to be dispatched into cluster
-        public void processEvent(EventWrapper eventWrapper) {
-            try {
-                synchronized (this) {
-                    if (!init) {
-                        return;
-                    }
-                    rawEventCount++;
-                    eventCount++;
-                }
-
-                // null keys => round-robin
-                // empty key list => default partitioning of underlying
-                // partitioner.
-                List<List<String>> keys = eventWrapper.getCompoundKeyNames();
-
-                String stream = eventWrapper.getStreamName();
-
-                Object event = eventWrapper.getEvent();
-
-                if (event instanceof Request)
-                    decorateRequest((Request) event);
-
-                dispatcher.dispatchEvent(stream, keys, event);
-
-            } catch (Exception e) {
-                Logger.getLogger("adapter").info("Exception adapting event", e);
-            }
-        }
-
-        private volatile int eventCount = 0;
-        private volatile int rawEventCount = 0;
-
-        // set the return stream name to the adapter cluster name
-        private void decorateRequest(Request r) {
-            Request.RInfo rinfo = r.getRInfo();
-
-            if (rinfo instanceof Request.ClientRInfo) {
-                String cname = clusterEventListener.getRawListener()
-                                                   .getAppName();
-
-                ((Request.ClientRInfo) rinfo).setStream("@" + cname);
-            }
-        }
-    }
-
-    /**
-     * Read events from S4 cluster.
-     */
-    private class Reader implements AsynchronousEventProcessor {
-        /**
-         * Queue work for processing by OutputStubs. This method simply queues
-         * the events in the appropriate stubs. An event from stream K is queued
-         * in OutputStub S if either S accepts all streams, or K is contained in
-         * the list {@code S.getAcceptedStreams()}.
-         */
-        @Override
-        public void queueWork(EventWrapper eventWrapper) {
-            List<OutputStub> stubs = eventReceivers.get(eventWrapper.getStreamName());
-
-            // stubs that accept any stream
-            for (OutputStub stub : eventReceiversAny)
-                stub.queueWork(eventWrapper);
-
-            // stubs that receive this stream in particular
-            if (stubs != null)
-                for (OutputStub stub : stubs)
-                    stub.queueWork(eventWrapper);
-        }
-
-        @Override
-        public int getQueueSize() {
-            int sz = 0;
-
-            for (OutputStub stub : eventReceiversAll)
-                sz += stub.getQueueSize();
-
-            return sz;
-        }
-
-    }
-
-    private static class TestDispatcher implements EventDispatcher {
-        @Override
-        public void dispatchEvent(String s, Object e) {
-            System.out.println("Dispatching event: " + s + ":" + e);
-        }
-
-        @Override
-        public void dispatchEvent(String s, List<List<String>> k, Object e) {
-            System.out.println("Dispatching event: " + s + ":" + k + ":" + e);
-        }
-    }
-
-    private static class TestReturnType {
-        private int ra;
-        private int rb;
-
-        @SuppressWarnings("unused")
-        TestReturnType() {
-        }
-
-        TestReturnType(int a, int b) {
-            this.ra = a;
-            this.rb = b;
-        }
-
-        public String toString() {
-            return "ra=" + ra + " rb=" + rb;
-        }
-    }
-
-    @SuppressWarnings("static-access")
-    public static void main(String args[]) throws IOException,
-            InterruptedException {
-
-        Options options = new Options();
-
-        options.addOption(OptionBuilder.withArgName("corehome")
-                                       .hasArg()
-                                       .withDescription("core home")
-                                       .create("c"));
-
-        options.addOption(OptionBuilder.withArgName("instanceid")
-                                       .hasArg()
-                                       .withDescription("instance id")
-                                       .create("i"));
-
-        options.addOption(OptionBuilder.withArgName("configtype")
-                                       .hasArg()
-                                       .withDescription("configuration type")
-                                       .create("t"));
-
-        options.addOption(OptionBuilder.withArgName("userconfig")
-                                       .hasArg()
-                                       .withDescription("user-defined legacy data adapter configuration file")
-                                       .create("d"));
-
-        CommandLineParser parser = new GnuParser();
-        CommandLine commandLine = null;
-
-        try {
-            commandLine = parser.parse(options, args);
-        } catch (ParseException pe) {
-            System.err.println(pe.getLocalizedMessage());
-            System.exit(1);
-        }
-
-        int instanceId = -1;
-        if (commandLine.hasOption("i")) {
-            String instanceIdStr = commandLine.getOptionValue("i");
-            try {
-                instanceId = Integer.parseInt(instanceIdStr);
-            } catch (NumberFormatException nfe) {
-                System.err.println("Bad instance id: %s" + instanceIdStr);
-                System.exit(1);
-            }
-        }
-
-        if (commandLine.hasOption("c")) {
-            coreHome = commandLine.getOptionValue("c");
-        }
-
-        String configType = "typical";
-        if (commandLine.hasOption("t")) {
-            configType = commandLine.getOptionValue("t");
-        }
-
-        String userConfigFilename = null;
-        if (commandLine.hasOption("d")) {
-            userConfigFilename = commandLine.getOptionValue("d");
-        }
-
-        File userConfigFile = new File(userConfigFilename);
-        if (!userConfigFile.isFile()) {
-            System.err.println("Bad user configuration file: "
-                    + userConfigFilename);
-            System.exit(1);
-        }
-
-        File coreHomeFile = new File(coreHome);
-        if (!coreHomeFile.isDirectory()) {
-            System.err.println("Bad core home: " + coreHome);
-            System.exit(1);
-        }
-
-        if (instanceId > -1) {
-            System.setProperty("instanceId", "" + instanceId);
-        } else {
-            System.setProperty("instanceId", "" + S4Util.getPID());
-        }
-
-        String configBase = coreHome + File.separatorChar + "conf"
-                + File.separatorChar + configType;
-        String configPath = configBase + File.separatorChar
-                + "client-adapter-conf.xml";
-        File configFile = new File(configPath);
-        if (!configFile.exists()) {
-            System.err.printf("adapter config file %s does not exist\n",
-                              configPath);
-            System.exit(1);
-        }
-
-        // load adapter config xml
-        ApplicationContext coreContext;
-        coreContext = new FileSystemXmlApplicationContext("file:" + configPath);
-        ApplicationContext context = coreContext;
-
-        Adapter adapter = (Adapter) context.getBean("client_adapter");
-
-        ApplicationContext appContext = new FileSystemXmlApplicationContext(new String[] { "file:"
-                                                                                    + userConfigFilename },
-                                                                            context);
-
-        Map<?, ?> inputStubBeanMap = appContext.getBeansOfType(InputStub.class);
-        Map<?, ?> outputStubBeanMap = appContext.getBeansOfType(OutputStub.class);
-
-        if (inputStubBeanMap.size() == 0 && outputStubBeanMap.size() == 0) {
-            System.err.println("No user-defined input/output stub beans");
-            System.exit(1);
-        }
-
-        ArrayList<InputStub> inputStubs = new ArrayList<InputStub>(inputStubBeanMap.size());
-        ArrayList<OutputStub> outputStubs = new ArrayList<OutputStub>(outputStubBeanMap.size());
-
-        // add all input stubs
-        for (Map.Entry<?, ?> e : inputStubBeanMap.entrySet()) {
-            String beanName = (String) e.getKey();
-            System.out.println("Adding InputStub " + beanName);
-            inputStubs.add((InputStub) e.getValue());
-        }
-
-        // add all output stubs
-        for (Map.Entry<?, ?> e : outputStubBeanMap.entrySet()) {
-            String beanName = (String) e.getKey();
-            System.out.println("Adding OutputStub " + beanName);
-            outputStubs.add((OutputStub) e.getValue());
-        }
-
-        adapter.setInputStubs(inputStubs);
-        adapter.setOutputStubs(outputStubs);
-
-    }
-
-    public static void clientTest() throws IOException, InterruptedException {
-        BasicConfigurator.configure();
-
-        TestDispatcher disp = new TestDispatcher();
-
-        Adapter adapter = new Adapter();
-        adapter.setDispatcher(disp);
-
-        GenericJsonClientStub stub = new GenericJsonClientStub();
-        stub.setConnectionPort(2334);
-
-        InputStub[] in = { stub };
-        OutputStub[] out = { stub };
-        adapter.setInputStubs(Arrays.asList(in));
-        adapter.setOutputStubs(Arrays.asList(out));
-
-        adapter.init();
-        stub.init();
-
-        while (true) {
-            Thread.sleep(10000);
-            TestReturnType r = new TestReturnType(100, 200);
-            adapter.eventReader.queueWork(new EventWrapper("TESTSTREAM",
-                                                           r,
-                                                           null));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/ClientConnection.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/ClientConnection.java b/s4-core/src/main/java/io/s4/client/ClientConnection.java
deleted file mode 100644
index fce2353..0000000
--- a/s4-core/src/main/java/io/s4/client/ClientConnection.java
+++ /dev/null
@@ -1,163 +0,0 @@
-package io.s4.client;
-
-import io.s4.collector.EventWrapper;
-import io.s4.message.Request;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.util.HashSet;
-import java.util.List;
-import java.util.UUID;
-
-/**
- * Connection to a client. A Stub has a collection of connections.
- */
-public class ClientConnection {
-    /**
-     * 
-     */
-    private final ClientStub clientStub;
-
-    /**
-     * TCP/IP socket used to communicate with client.
-     */
-    private final Socket socket;
-
-    public final IOChannel io;
-
-    public final ClientReadMode clientReadMode;
-    public final ClientWriteMode clientWriteMode;
-
-    /**
-     * GUID of client.
-     */
-    public final UUID uuid;
-
-    public ClientConnection(ClientStub clientStub, Socket socket, UUID uuid,
-            ClientReadMode clientReadMode, ClientWriteMode clientWriteMode)
-            throws IOException {
-        this.clientStub = clientStub;
-        this.uuid = uuid;
-        this.socket = socket;
-        this.io = this.clientStub.createIOChannel(socket);
-        this.clientReadMode = clientReadMode;
-        this.clientWriteMode = clientWriteMode;
-    }
-
-    public boolean good() {
-        return socket.isConnected();
-    }
-
-    public void close() {
-        synchronized (this.clientStub.clients) {
-            ClientStub.logger.info("closing connection to client " + uuid);
-            this.clientStub.clients.remove(this.uuid);
-        }
-
-        try {
-            socket.close();
-        } catch (IOException e) {
-            ClientStub.logger.error("problem closing client connection to client "
-                                            + uuid,
-                                    e);
-        }
-    }
-
-    private HashSet<String> includeStreams = new HashSet<String>();
-    private HashSet<String> excludeStreams = new HashSet<String>();
-
-    public void includeStreams(List<String> streams) {
-        includeStreams.addAll(streams);
-    }
-
-    public void excludeStreams(List<String> streams) {
-        excludeStreams.addAll(streams);
-    }
-
-    /**
-     * Stream is accepted if and only if:
-     * 
-     * (A) readMode is Select and: 1. indifferent to stream inclusion OR stream
-     * is included AND 2. indifferent to stream exclusion OR stream is not
-     * excluded.
-     * 
-     * OR (B) readMode is All
-     * 
-     * @return true if and only if stream is accepted
-     */
-    public boolean streamAccepted(String s) {
-        switch (clientReadMode) {
-            case None:
-            case Private:
-                return false;
-
-            case Select:
-                return (includeStreams.isEmpty() || includeStreams.contains(s))
-                        && (excludeStreams.isEmpty() || !excludeStreams.contains(s));
-
-            case All:
-                return true;
-
-            default:
-                return false;
-        }
-    }
-
-    private Thread receiverThread = null;
-
-    public void start() {
-        if (clientWriteMode == ClientWriteMode.Enabled)
-            (receiverThread = new Thread(receiver)).start();
-    }
-
-    public void stop() {
-        if (receiverThread != null) {
-            receiverThread.interrupt();
-            receiverThread = null;
-        }
-    }
-
-    public final Runnable receiver = new Runnable() {
-        public void run() {
-            try {
-                while (good()) {
-                    byte[] b = io.recv();
-
-                    // null, empty => goodbye
-                    if (b == null || b.length == 0) {
-                        ClientStub.logger.info("client session ended " + uuid);
-                        break;
-                    }
-
-                    EventWrapper ew = ClientConnection.this.clientStub.eventWrapperFromBytes(b);
-                    if (ew == null)
-                        continue;
-
-                    Object event = ew.getEvent();
-                    if (event instanceof Request) {
-                        decorateRequest((Request) event);
-                        ClientStub.logger.info("Decorated client request: "
-                                + ew.toString());
-                    }
-
-                    ClientConnection.this.clientStub.injectEvent(ew);
-                }
-            } catch (IOException e) {
-                ClientStub.logger.info("error while reading from client "
-                        + uuid, e);
-
-            } finally {
-                close();
-            }
-        }
-
-        private void decorateRequest(Request r) {
-            // add UUID of client into request.
-            Request.RInfo info = r.getRInfo();
-
-            if (info != null && info instanceof Request.ClientRInfo)
-                ((Request.ClientRInfo) info).setRequesterUUID(ClientConnection.this.uuid);
-        }
-    };
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/ClientReadMode.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/ClientReadMode.java b/s4-core/src/main/java/io/s4/client/ClientReadMode.java
deleted file mode 100644
index cf374a1..0000000
--- a/s4-core/src/main/java/io/s4/client/ClientReadMode.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package io.s4.client;
-
-/**
- * Client mode.
- */
-public enum ClientReadMode {
-    None(false, false), Private(true, false), Select(true, true), All(true, true);
-
-    private final boolean priv;
-    private final boolean pub;
-
-    ClientReadMode(boolean priv, boolean pub) {
-        this.priv = priv;
-        this.pub = pub;
-    }
-
-    public boolean takePublic() {
-        return pub;
-    }
-
-    public boolean takePrivate() {
-        return priv;
-    }
-
-    public static ClientReadMode fromString(String s) {
-        if (s.equalsIgnoreCase("none"))
-            return None;
-        else if (s.equalsIgnoreCase("private"))
-            return Private;
-        else if (s.equalsIgnoreCase("select"))
-            return Select;
-        else if (s.equalsIgnoreCase("all"))
-            return All;
-        else
-            return null;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/ClientStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/ClientStub.java b/s4-core/src/main/java/io/s4/client/ClientStub.java
deleted file mode 100644
index 65b9804..0000000
--- a/s4-core/src/main/java/io/s4/client/ClientStub.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client;
-
-import io.s4.collector.EventWrapper;
-import io.s4.listener.EventHandler;
-import io.s4.message.Request;
-import io.s4.message.Response;
-import io.s4.util.ByteArrayIOChannel;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.log4j.Logger;
-
-public abstract class ClientStub implements OutputStub, InputStub {
-
-    protected static final Logger logger = Logger.getLogger("adapter");
-
-    /**
-     * Description of the protocol implemented by a concrete instance of this
-     * stub.
-     */
-    public static class Info {
-        public final String name;
-        public final int versionMajor;
-        public final int versionMinor;
-
-        public Info(String name, int versionMajor, int versionMinor) {
-            this.name = name;
-            this.versionMajor = versionMajor;
-            this.versionMinor = versionMinor;
-        }
-    }
-
-    /**
-     * Meta-information about the protocol that this stub uses to talk to
-     * external clients.
-     * 
-     * This is sent to the client as a part of the handshake.
-     */
-    abstract public Info getProtocolInfo();
-
-    /**
-     * Stream names that are accepted by this stub to be forwarded to its
-     * clients.
-     */
-    @Override
-    public List<String> getAcceptedStreams() {
-        return null;
-    }
-
-    private List<EventHandler> handlers = new ArrayList<EventHandler>();
-
-    /**
-     * A handler that can inject events produced by this stub into the S4
-     * cluster.
-     */
-    @Override
-    public void addHandler(EventHandler handler) {
-        this.handlers.add(handler);
-    }
-
-    /**
-     * Remove a handler.
-     */
-    @Override
-    public boolean removeHandler(EventHandler handler) {
-        return handlers.remove(handler);
-    }
-
-    /**
-     * Convert an array of bytes into an event wrapper. This method is used to
-     * translate data received from a client into events that may be injected
-     * into the S4 cluster.
-     * 
-     * @param v
-     *            array of bytes
-     * @return EventWrapper constructed from the byte array.
-     */
-    abstract public EventWrapper eventWrapperFromBytes(byte[] v);
-
-    /**
-     * Convert an event wrapper into a byte array. Events received from the S4
-     * cluster for dispatching to a client are translated into a byte array
-     * using this method.
-     * 
-     * @param e
-     *            an {@link EventWrapper}
-     * @return a byte array
-     */
-    abstract public byte[] bytesFromEventWrapper(EventWrapper e);
-
-    /**
-     * Construct an I/O channel over which the stub can communicate with a
-     * client. The channel allows arrys of bytes to be exchanged between the
-     * stub and client.
-     * 
-     * @param socket
-     *            TCP/IP socket
-     * @return an IO Channel to send and recv byte arrays
-     * @throws IOException
-     *             if the underlying socket could not provide valid input and
-     *             output streams.
-     */
-    public IOChannel createIOChannel(Socket socket) throws IOException {
-        return new ByteArrayIOChannel(socket);
-    }
-
-    // send an event into the cluster via adapter.
-    void injectEvent(EventWrapper e) {
-        for (EventHandler handler : handlers) {
-            handler.processEvent(e);
-        }
-    }
-
-    // private List<ClientConnection> clients = new
-    // ArrayList<ClientConnection>();
-    HashMap<UUID, ClientConnection> clients = new HashMap<UUID, ClientConnection>();
-
-    /**
-     * Create a client connection and add it to list of clients.
-     * 
-     * @param socket
-     *            client's I/O socket
-     */
-    private void addClient(ClientConnection c) {
-        synchronized (clients) {
-            logger.info("adding client " + c.uuid);
-            clients.put(c.uuid, c);
-        }
-    }
-
-    LinkedBlockingQueue<EventWrapper> queue = new LinkedBlockingQueue<EventWrapper>();
-
-    @Override
-    public int getQueueSize() {
-        return queue.size();
-    }
-
-    @Override
-    public void queueWork(EventWrapper e) {
-        queue.offer(e);
-    }
-
-    ServerSocket serverSocket = null;
-
-    public void setConnectionPort(int port) throws IOException {
-        serverSocket = new ServerSocket(port);
-    }
-
-    private Thread acceptThread = null;
-    private Thread senderThread = null;
-
-    public void init() {
-        // start accepting new clients and sending events to them
-        (acceptThread = new Thread(connectionListener)).start();
-        (senderThread = new Thread(sender)).start();
-    }
-
-    public void shutdown() {
-        // stop accepting new clients
-        if (acceptThread != null) {
-            acceptThread.interrupt();
-            acceptThread = null;
-        }
-
-        // stop sending events to them.
-        if (senderThread != null) {
-            senderThread.interrupt();
-            senderThread = null;
-        }
-
-        // stop all connected clients.
-        List<ClientConnection> clientCopy = new ArrayList<ClientConnection>(clients.values());
-        for (ClientConnection c : clientCopy) {
-            c.stop();
-            c.close();
-        }
-    }
-
-    private final Runnable connectionListener = new Runnable() {
-
-        Handshake handshake = null;
-
-        public void run() {
-            if (handshake == null)
-                handshake = new Handshake(ClientStub.this);
-
-            try {
-                while (serverSocket != null && serverSocket.isBound()
-                        && !Thread.currentThread().isInterrupted()) {
-
-                    Socket socket = serverSocket.accept();
-
-                    ClientConnection connection = handshake.execute(socket);
-
-                    if (connection != null) {
-                        addClient(connection);
-                        connection.start();
-                    }
-
-                }
-            } catch (IOException e) {
-                logger.info("exception in client connection listener", e);
-            }
-        }
-
-    };
-
-    public final Runnable sender = new Runnable() {
-        ArrayList<ClientConnection> disconnect = new ArrayList<ClientConnection>();
-
-        public void run() {
-
-            while (!Thread.currentThread().isInterrupted()) {
-                try {
-                    EventWrapper event = queue.take();
-
-                    // Responses need special handling.
-                    if (event.getEvent() instanceof Response) {
-                        dispatchResponse(event);
-                        continue;
-                    }
-
-                    // TODO: include check to see if the event belongs to a
-                    // particular client.
-
-                    dispatchToAllClients(event);
-
-                } catch (InterruptedException e) {
-                    return;
-                }
-            }
-        }
-
-        private void dispatchToAllClients(EventWrapper event) {
-
-            byte[] b = bytesFromEventWrapper(event);
-            String stream = event.getStreamName();
-            
-            synchronized (clients) {
-                for (ClientConnection c : clients.values()) {
-                    if (c.good() && c.streamAccepted(stream)) {
-                        try {
-                            c.io.send(b);
-
-                        } catch (IOException e) {
-                            logger.error("error sending message to client "
-                                    + c.uuid + ". disconnecting", e);
-
-                            disconnect.add(c);
-                        }
-                    }
-                }
-            }
-
-            if (disconnect.size() > 0) {
-                for (ClientConnection d : disconnect)
-                    d.close();
-
-                disconnect.clear();
-            }
-        }
-
-        private void dispatchResponse(EventWrapper event) {
-            Response res = (Response) event.getEvent();
-            Request.RInfo rinfo = res.getRInfo();
-
-            if (rinfo instanceof Request.ClientRInfo) {
-                UUID uuid = ((Request.ClientRInfo) rinfo).getRequesterUUID();
-
-                ClientConnection c = clients.get(uuid);
-
-                if (c != null && c.good() && c.clientReadMode.takePrivate()) {
-                    try {
-                        byte[] b = bytesFromEventWrapper(event);
-                        if (b != null) c.io.send(b);
-
-                    } catch (IOException e) {
-                        logger.error("error sending response to client "
-                                + c.uuid + ". disconnecting", e);
-
-                        c.close();
-                    }
-
-                } else {
-                    logger.warn("no active client found for response: " + res);
-                }
-            }
-        }
-    };
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/ClientWriteMode.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/ClientWriteMode.java b/s4-core/src/main/java/io/s4/client/ClientWriteMode.java
deleted file mode 100644
index f102363..0000000
--- a/s4-core/src/main/java/io/s4/client/ClientWriteMode.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package io.s4.client;
-
-/**
- * Client's write mode.
- */
-public enum ClientWriteMode {
-    Enabled, Disabled;
-
-    public static ClientWriteMode fromString(String s) {
-        if (s.equalsIgnoreCase("enabled"))
-            return Enabled;
-        else if (s.equalsIgnoreCase("disabled"))
-            return Disabled;
-        else
-            return null;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/GenericJsonClientStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/GenericJsonClientStub.java b/s4-core/src/main/java/io/s4/client/GenericJsonClientStub.java
deleted file mode 100644
index ffce5c6..0000000
--- a/s4-core/src/main/java/io/s4/client/GenericJsonClientStub.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client;
-
-import io.s4.client.util.ObjectBuilder;
-import io.s4.collector.EventWrapper;
-import io.s4.util.GsonUtil;
-
-import java.nio.charset.Charset;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public class GenericJsonClientStub extends ClientStub {
-
-    // private static final ObjectBuilder builder = new ObjectBuilder();
-    // private static final Gson builder = new Gson();
-
-    private static final Info protocolInfo = new Info("generic-json", 1, 0);
-
-    @Override
-    public Info getProtocolInfo() {
-        return protocolInfo;
-    }
-
-    @Override
-    public EventWrapper eventWrapperFromBytes(byte[] v) {
-        try {
-            // interpret v as a JSON string
-            String s = new String(v, Charset.forName("UTF8"));
-            JSONObject json = new JSONObject(s);
-
-            String streamName = json.getString("stream");
-            String className = json.getString("class");
-
-            Class<?> clazz;
-            try {
-                clazz = Class.forName(className);
-            } catch (ClassNotFoundException e) {
-                throw new ObjectBuilder.Exception("bad class name for json-encoded object: "
-                                                          + className,
-                                                  e);
-            }
-
-            String[] keyNames = null;
-            JSONArray keyArray = json.optJSONArray("keys");
-            if (keyArray != null) {
-                keyNames = new String[keyArray.length()];
-                for (int i = 0; i < keyNames.length; ++i) {
-                    keyNames[i] = keyArray.optString(i);
-                }
-            }
-
-            String jevent = json.getString("object");
-
-            Object obj = GsonUtil.get().fromJson(jevent, clazz);
-
-            return new EventWrapper(streamName, keyNames, obj);
-
-        } catch (JSONException e) {
-            logger.error("problem with event JSON", e);
-        } catch (ObjectBuilder.Exception e) {
-            logger.error("failed to build object from JSON", e);
-        }
-
-        return null;
-    }
-
-    @Override
-    public byte[] bytesFromEventWrapper(EventWrapper ew) {
-        JSONObject jevent = new JSONObject();
-
-        Object obj = ew.getEvent();
-
-        try {
-            jevent.put("stream", ew.getStreamName());
-            jevent.put("class", obj.getClass().getName());
-            jevent.put("object", GsonUtil.get().toJson(obj));
-
-            return jevent.toString().getBytes(Charset.forName("UTF8"));
-
-        } catch (JSONException e) {
-            logger.error("exception while converting event wrapper to bytes.",
-                         e);
-            return null;
-        } catch (IllegalArgumentException iae) {
-        	logger.error("exception while converting event wrapper to bytes.",
-                    iae);
-        	logger.error(obj);
-        	return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/Handshake.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/Handshake.java b/s4-core/src/main/java/io/s4/client/Handshake.java
deleted file mode 100644
index b93f94a..0000000
--- a/s4-core/src/main/java/io/s4/client/Handshake.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client;
-
-import io.s4.client.ClientStub.Info;
-import io.s4.util.ByteArrayIOChannel;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.log4j.Logger;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import com.google.gson.Gson;
-
-public class Handshake {
-    private final ClientStub clientStub;
-
-    protected static final Logger logger = Logger.getLogger("adapter");
-
-    protected final Gson gson = new Gson();
-
-    public Handshake(ClientStub clientStub) {
-        this.clientStub = clientStub;
-    }
-
-    // execute handshake with client.
-    // 1. discovery: issue uuid;
-    // 2. main connect: create connection
-    public ClientConnection execute(Socket s) {
-        byte[] v;
-
-        try {
-            ClientConnection conn = null;
-
-            ByteArrayIOChannel io = new ByteArrayIOChannel(s);
-
-            v = io.recv();
-
-            if (v == null || v.length == 0) {
-                // no information => client initialization
-                clientInit(io);
-
-            } else {
-                // some data => client connect
-                conn = clientConnect(v, io, s);
-            }
-
-            if (conn == null)
-                s.close();
-
-            return conn;
-
-        } catch (IOException e) {
-            logger.error("exception during handshake", e);
-            try {
-                s.close();
-                return null;
-
-            } catch (IOException ee) {
-                throw new RuntimeException("failed to close socket after failed handshake",
-                                           ee);
-            }
-        }
-    }
-
-    private ClientConnection clientConnect(byte[] v, ByteArrayIOChannel io,
-                                           Socket sock) throws IOException {
-
-        List<String> reason = new ArrayList<String>(1);
-        ClientConnection conn = clientConnectCreate(v, io, sock, reason);
-
-        String message = null;
-        try {
-            JSONObject resp = new JSONObject();
-
-            resp.put("status", (conn != null ? "ok" : "failed"));
-
-            if (conn == null && !reason.isEmpty()) {
-                resp.put("reason", reason.get(0));
-            }
-
-            message = resp.toString();
-
-        } catch (JSONException e) {
-            logger.error("error creating response during connect.", e);
-            return null;
-        }
-
-        io.send(message.getBytes());
-
-        return conn;
-    }
-
-    private ClientConnection clientConnectCreate(byte[] v,
-                                                 ByteArrayIOChannel io,
-                                                 Socket sock,
-                                                 List<String> reason)
-            throws IOException {
-
-        try {
-            JSONObject cInfo = new JSONObject(new String(v));
-
-            String s = cInfo.optString("uuid", "");
-            if (s.isEmpty()) {
-                logger.error("missing client identifier during handshake.");
-                reason.add("missing UUID");
-                return null;
-            }
-
-            UUID u = UUID.fromString(s);
-
-            logger.info("connecting to client " + u);
-
-            s = cInfo.optString("readMode", "Private");
-            ClientReadMode rmode = ClientReadMode.fromString(s);
-            if (rmode == null) {
-                logger.error(u + ": unknown readMode " + s);
-                reason.add("unknown readMode " + s);
-                return null;
-
-            }
-
-            s = cInfo.optString("writeMode", "Enabled");
-            ClientWriteMode wmode = ClientWriteMode.fromString(s);
-            if (wmode == null) {
-                logger.error(u + ": unknown writeMode " + s);
-                reason.add("unknown writeMode " + s);
-                return null;
-            }
-
-            logger.info(u + " read=" + rmode + " write=" + wmode);
-
-            if (rmode == ClientReadMode.None
-                    && wmode == ClientWriteMode.Disabled) {
-                // client cannot disable read AND write...
-                logger.error("client neither reads nor writes.");
-                reason.add("read and write disabled");
-
-                return null;
-            }
-
-            ClientConnection conn = new ClientConnection(clientStub, sock, u, rmode, wmode);
-
-            if (rmode == ClientReadMode.Select) {
-                JSONArray incl = cInfo.optJSONArray("readInclude");
-                JSONArray excl = cInfo.optJSONArray("readExclude");
-
-                if (incl == null && excl == null) {
-                    logger.error(u + ": missing stream selection information");
-                    reason.add("missing readInclude and readExclude");
-                    return null;
-                }
-
-                if (incl != null) {
-                    List<String> streams = new ArrayList<String>(incl.length());
-                    for (int i = 0; i < incl.length(); ++i)
-                        streams.add(incl.getString(i));
-                    
-                    conn.includeStreams(streams);
-                }
-                
-                if (excl != null) {
-                    List<String> streams = new ArrayList<String>(excl.length());
-                    for (int i = 0; i < excl.length(); ++i)
-                        streams.add(excl.getString(i));
-                    
-                    conn.excludeStreams(streams);
-                }
-                
-            }
-
-            return conn;
-            
-        } catch (JSONException e) {
-            logger.error("malformed JSON from client during handshake", e);
-            reason.add("malformed JSON");
-
-        } catch (NumberFormatException e) {
-            logger.error("received malformed UUID", e);
-            reason.add("malformed UUID");
-
-        } catch (IllegalArgumentException e) {
-            logger.error("received malformed UUID", e);
-            reason.add("malformed UUID");
-        }
-
-        return null;
-    }
-
-    private void clientInit(ByteArrayIOChannel io) throws IOException {
-        String uuid = UUID.randomUUID().toString();
-        Info proto = clientStub.getProtocolInfo();
-
-        HashMap<String, Object> info = new HashMap<String, Object>();
-        info.put("uuid", uuid);
-        info.put("protocol", proto);
-
-        String response = gson.toJson(info) + "\n";
-
-        io.send(response.getBytes());
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/IOChannel.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/IOChannel.java b/s4-core/src/main/java/io/s4/client/IOChannel.java
deleted file mode 100644
index e804b05..0000000
--- a/s4-core/src/main/java/io/s4/client/IOChannel.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client;
-
-import java.io.IOException;
-
-public interface IOChannel {
-    byte[] recv() throws IOException;
-
-    void send(byte[] v) throws IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/InputStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/InputStub.java b/s4-core/src/main/java/io/s4/client/InputStub.java
deleted file mode 100644
index a63ccb8..0000000
--- a/s4-core/src/main/java/io/s4/client/InputStub.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client;
-
-import io.s4.listener.EventProducer;
-
-public interface InputStub extends EventProducer {
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/OutputStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/OutputStub.java b/s4-core/src/main/java/io/s4/client/OutputStub.java
deleted file mode 100644
index b68573d..0000000
--- a/s4-core/src/main/java/io/s4/client/OutputStub.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client;
-
-import io.s4.processor.AsynchronousEventProcessor;
-
-import java.util.List;
-
-public interface OutputStub extends AsynchronousEventProcessor {
-
-    List<String> getAcceptedStreams();
-
-}
\ No newline at end of file