You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:16 UTC
[29/50] [abbrv] git commit: Rename packages in preparation for move
to Apache
Rename packages in preparation for move to Apache
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/a7b4afb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/a7b4afb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/a7b4afb0
Branch: refs/heads/dev
Commit: a7b4afb095b47d0692913d0e157ecef2d2167a83
Parents: 64a901f
Author: Bruce Robbins <ro...@everychoose-lm.corp.yahoo.com>
Authored: Thu Nov 17 23:07:32 2011 -0800
Committer: Bruce Robbins <ro...@everychoose-lm.corp.yahoo.com>
Committed: Thu Nov 17 23:07:32 2011 -0800
----------------------------------------------------------------------
s4-core/src/main/java/io/s4/MainApp.java | 291 ------
s4-core/src/main/java/io/s4/adapter/Adapter.java | 208 ----
s4-core/src/main/java/io/s4/client/Adapter.java | 432 --------
.../main/java/io/s4/client/ClientConnection.java | 163 ---
.../src/main/java/io/s4/client/ClientReadMode.java | 37 -
s4-core/src/main/java/io/s4/client/ClientStub.java | 312 ------
.../main/java/io/s4/client/ClientWriteMode.java | 17 -
.../java/io/s4/client/GenericJsonClientStub.java | 107 --
s4-core/src/main/java/io/s4/client/Handshake.java | 223 -----
s4-core/src/main/java/io/s4/client/IOChannel.java | 24 -
s4-core/src/main/java/io/s4/client/InputStub.java | 21 -
s4-core/src/main/java/io/s4/client/OutputStub.java | 26 -
.../main/java/io/s4/client/util/ObjectBuilder.java | 200 ----
s4-core/src/main/java/io/s4/collector/Event.java | 117 ---
.../main/java/io/s4/collector/EventListener.java | 93 --
.../src/main/java/io/s4/collector/EventRecord.java | 189 ----
.../main/java/io/s4/collector/EventWrapper.java | 80 --
.../src/main/java/io/s4/dispatcher/Dispatcher.java | 227 -----
.../java/io/s4/dispatcher/EventDispatcher.java | 47 -
.../java/io/s4/dispatcher/MultiDispatcher.java | 49 -
.../s4/dispatcher/StreamExcludingDispatcher.java | 52 -
.../s4/dispatcher/StreamSelectingDispatcher.java | 52 -
.../partitioner/BroadcastPartitioner.java | 43 -
.../s4/dispatcher/partitioner/CompoundKeyInfo.java | 77 --
.../s4/dispatcher/partitioner/DefaultHasher.java | 27 -
.../dispatcher/partitioner/DefaultPartitioner.java | 351 -------
.../dispatcher/partitioner/DummyPartitioner.java | 33 -
.../s4/dispatcher/partitioner/HashAlgorithm.java | 160 ---
.../java/io/s4/dispatcher/partitioner/Hasher.java | 20 -
.../java/io/s4/dispatcher/partitioner/KeyInfo.java | 125 ---
.../partitioner/LoopbackPartitioner.java | 69 --
.../io/s4/dispatcher/partitioner/Partitioner.java | 22 -
.../partitioner/RoundRobinPartitioner.java | 60 --
.../partitioner/TestDefaultPartitioner.java | 107 --
.../partitioner/VariableKeyPartitioner.java | 24 -
.../io/s4/dispatcher/transformer/Transformer.java | 21 -
.../main/java/io/s4/emitter/CommLayerEmitter.java | 261 -----
.../src/main/java/io/s4/emitter/EventEmitter.java | 24 -
.../src/main/java/io/s4/ft/CheckpointingEvent.java | 53 -
.../io/s4/ft/DefaultFileSystemStateStorage.java | 242 -----
.../java/io/s4/ft/InitiateCheckpointingEvent.java | 35 -
.../io/s4/ft/LoggingStorageCallbackFactory.java | 54 -
s4-core/src/main/java/io/s4/ft/RecoveryEvent.java | 33 -
.../src/main/java/io/s4/ft/RedisStateStorage.java | 127 ---
s4-core/src/main/java/io/s4/ft/SafeKeeper.java | 257 -----
s4-core/src/main/java/io/s4/ft/SafeKeeperId.java | 119 ---
s4-core/src/main/java/io/s4/ft/SaveStateTask.java | 43 -
s4-core/src/main/java/io/s4/ft/StateStorage.java | 67 --
.../src/main/java/io/s4/ft/StorageCallback.java | 34 -
.../main/java/io/s4/ft/StorageCallbackFactory.java | 33 -
s4-core/src/main/java/io/s4/ft/package.html | 23 -
.../java/io/s4/listener/CommLayerListener.java | 279 ------
.../src/main/java/io/s4/listener/EventHandler.java | 23 -
.../main/java/io/s4/listener/EventListener.java | 23 -
.../main/java/io/s4/listener/EventProducer.java | 23 -
.../src/main/java/io/s4/logger/Log4jMonitor.java | 118 ---
s4-core/src/main/java/io/s4/logger/Monitor.java | 30 -
.../src/main/java/io/s4/logger/TraceMessage.java | 40 -
.../main/java/io/s4/message/PrototypeRequest.java | 111 --
s4-core/src/main/java/io/s4/message/Request.java | 181 ----
s4-core/src/main/java/io/s4/message/Response.java | 92 --
.../main/java/io/s4/message/SinglePERequest.java | 137 ---
.../main/java/io/s4/persist/ConMapPersister.java | 192 ----
.../main/java/io/s4/persist/DumpingPersister.java | 162 ---
.../main/java/io/s4/persist/HashMapPersister.java | 207 ----
s4-core/src/main/java/io/s4/persist/Persister.java | 186 ----
.../src/main/java/io/s4/processor/AbstractPE.java | 778 ---------------
.../java/io/s4/processor/AbstractWindowingPE.java | 196 ----
.../s4/processor/AsynchronousEventProcessor.java | 28 -
.../io/s4/processor/ControlEventProcessor.java | 79 --
.../src/main/java/io/s4/processor/EventAdvice.java | 38 -
s4-core/src/main/java/io/s4/processor/JoinPE.java | 194 ----
.../main/java/io/s4/processor/OutputFormatter.java | 20 -
.../java/io/s4/processor/OverloadDispatcher.java | 20 -
.../s4/processor/OverloadDispatcherGenerator.java | 313 ------
.../io/s4/processor/OverloadDispatcherSlot.java | 20 -
.../src/main/java/io/s4/processor/PEContainer.java | 431 --------
.../main/java/io/s4/processor/PrintEventPE.java | 31 -
.../java/io/s4/processor/PrototypeWrapper.java | 129 ---
.../src/main/java/io/s4/processor/ReroutePE.java | 104 --
.../java/io/s4/processor/SimpleCountingPE.java | 92 --
s4-core/src/main/java/io/s4/schema/Schema.java | 293 ------
.../main/java/io/s4/schema/SchemaContainer.java | 37 -
.../src/main/java/io/s4/schema/SchemaManager.java | 92 --
.../main/java/io/s4/serialize/KryoSerDeser.java | 75 --
.../io/s4/serialize/SerializerDeserializer.java | 22 -
.../java/io/s4/test/TestPersisterEventClock.java | 152 ---
.../java/io/s4/test/TestPersisterWallClock.java | 122 ---
.../main/java/io/s4/util/ByteArrayIOChannel.java | 94 --
s4-core/src/main/java/io/s4/util/Cloner.java | 20 -
.../src/main/java/io/s4/util/ClonerGenerator.java | 149 ---
.../java/io/s4/util/DoubleOutputFormatter.java | 44 -
s4-core/src/main/java/io/s4/util/GsonUtil.java | 74 --
s4-core/src/main/java/io/s4/util/KeyUtil.java | 59 --
.../src/main/java/io/s4/util/LoadGenerator.java | 640 ------------
.../src/main/java/io/s4/util/MethodInvoker.java | 124 ---
s4-core/src/main/java/io/s4/util/MetricsName.java | 51 -
.../src/main/java/io/s4/util/MiscConstants.java | 20 -
s4-core/src/main/java/io/s4/util/NumberUtils.java | 50 -
.../src/main/java/io/s4/util/PreprodLogger.java | 59 --
.../io/s4/util/ReverseDoubleOutputFormatter.java | 28 -
.../io/s4/util/ReverseIntegerOutputFormatter.java | 28 -
s4-core/src/main/java/io/s4/util/S4Util.java | 25 -
s4-core/src/main/java/io/s4/util/SlotUtils.java | 60 --
.../java/io/s4/util/ToStringOutputFormatter.java | 26 -
s4-core/src/main/java/io/s4/util/Watcher.java | 174 ----
s4-core/src/main/java/io/s4/util/clock/Clock.java | 25 -
.../java/io/s4/util/clock/ClockStreamsLoader.java | 47 -
.../main/java/io/s4/util/clock/DrivenClock.java | 87 --
.../src/main/java/io/s4/util/clock/EventClock.java | 74 --
.../main/java/io/s4/util/clock/TimerRequest.java | 46 -
.../src/main/java/io/s4/util/clock/WallClock.java | 41 -
s4-core/src/main/java/org/apache/s4/MainApp.java | 291 ++++++
.../main/java/org/apache/s4/adapter/Adapter.java | 208 ++++
.../main/java/org/apache/s4/client/Adapter.java | 432 ++++++++
.../org/apache/s4/client/ClientConnection.java | 163 +++
.../java/org/apache/s4/client/ClientReadMode.java | 37 +
.../main/java/org/apache/s4/client/ClientStub.java | 312 ++++++
.../java/org/apache/s4/client/ClientWriteMode.java | 17 +
.../apache/s4/client/GenericJsonClientStub.java | 107 ++
.../main/java/org/apache/s4/client/Handshake.java | 223 +++++
.../main/java/org/apache/s4/client/IOChannel.java | 24 +
.../main/java/org/apache/s4/client/InputStub.java | 21 +
.../main/java/org/apache/s4/client/OutputStub.java | 26 +
.../org/apache/s4/client/util/ObjectBuilder.java | 200 ++++
.../main/java/org/apache/s4/collector/Event.java | 117 +++
.../org/apache/s4/collector/EventListener.java | 93 ++
.../java/org/apache/s4/collector/EventRecord.java | 189 ++++
.../java/org/apache/s4/collector/EventWrapper.java | 80 ++
.../java/org/apache/s4/dispatcher/Dispatcher.java | 227 +++++
.../org/apache/s4/dispatcher/EventDispatcher.java | 47 +
.../org/apache/s4/dispatcher/MultiDispatcher.java | 49 +
.../s4/dispatcher/StreamExcludingDispatcher.java | 52 +
.../s4/dispatcher/StreamSelectingDispatcher.java | 52 +
.../partitioner/BroadcastPartitioner.java | 43 +
.../s4/dispatcher/partitioner/CompoundKeyInfo.java | 77 ++
.../s4/dispatcher/partitioner/DefaultHasher.java | 27 +
.../dispatcher/partitioner/DefaultPartitioner.java | 351 +++++++
.../dispatcher/partitioner/DummyPartitioner.java | 33 +
.../s4/dispatcher/partitioner/HashAlgorithm.java | 160 +++
.../apache/s4/dispatcher/partitioner/Hasher.java | 20 +
.../apache/s4/dispatcher/partitioner/KeyInfo.java | 125 +++
.../partitioner/LoopbackPartitioner.java | 69 ++
.../s4/dispatcher/partitioner/Partitioner.java | 22 +
.../partitioner/RoundRobinPartitioner.java | 60 ++
.../partitioner/TestDefaultPartitioner.java | 107 ++
.../partitioner/VariableKeyPartitioner.java | 24 +
.../s4/dispatcher/transformer/Transformer.java | 21 +
.../org/apache/s4/emitter/CommLayerEmitter.java | 261 +++++
.../java/org/apache/s4/emitter/EventEmitter.java | 24 +
.../java/org/apache/s4/ft/CheckpointingEvent.java | 53 +
.../s4/ft/DefaultFileSystemStateStorage.java | 242 +++++
.../apache/s4/ft/InitiateCheckpointingEvent.java | 35 +
.../s4/ft/LoggingStorageCallbackFactory.java | 54 +
.../main/java/org/apache/s4/ft/RecoveryEvent.java | 33 +
.../java/org/apache/s4/ft/RedisStateStorage.java | 127 +++
.../src/main/java/org/apache/s4/ft/SafeKeeper.java | 257 +++++
.../main/java/org/apache/s4/ft/SafeKeeperId.java | 119 +++
.../main/java/org/apache/s4/ft/SaveStateTask.java | 43 +
.../main/java/org/apache/s4/ft/StateStorage.java | 67 ++
.../java/org/apache/s4/ft/StorageCallback.java | 34 +
.../org/apache/s4/ft/StorageCallbackFactory.java | 33 +
.../src/main/java/org/apache/s4/ft/package.html | 23 +
.../org/apache/s4/listener/CommLayerListener.java | 279 ++++++
.../java/org/apache/s4/listener/EventHandler.java | 23 +
.../java/org/apache/s4/listener/EventListener.java | 23 +
.../java/org/apache/s4/listener/EventProducer.java | 23 +
.../java/org/apache/s4/logger/Log4jMonitor.java | 118 +++
.../main/java/org/apache/s4/logger/Monitor.java | 30 +
.../java/org/apache/s4/logger/TraceMessage.java | 40 +
.../org/apache/s4/message/PrototypeRequest.java | 111 ++
.../main/java/org/apache/s4/message/Request.java | 181 ++++
.../main/java/org/apache/s4/message/Response.java | 92 ++
.../org/apache/s4/message/SinglePERequest.java | 137 +++
.../org/apache/s4/persist/ConMapPersister.java | 192 ++++
.../org/apache/s4/persist/DumpingPersister.java | 162 +++
.../org/apache/s4/persist/HashMapPersister.java | 207 ++++
.../main/java/org/apache/s4/persist/Persister.java | 186 ++++
.../java/org/apache/s4/processor/AbstractPE.java | 778 +++++++++++++++
.../apache/s4/processor/AbstractWindowingPE.java | 196 ++++
.../s4/processor/AsynchronousEventProcessor.java | 28 +
.../apache/s4/processor/ControlEventProcessor.java | 79 ++
.../java/org/apache/s4/processor/EventAdvice.java | 38 +
.../main/java/org/apache/s4/processor/JoinPE.java | 194 ++++
.../org/apache/s4/processor/OutputFormatter.java | 20 +
.../apache/s4/processor/OverloadDispatcher.java | 20 +
.../s4/processor/OverloadDispatcherGenerator.java | 313 ++++++
.../s4/processor/OverloadDispatcherSlot.java | 20 +
.../java/org/apache/s4/processor/PEContainer.java | 431 ++++++++
.../java/org/apache/s4/processor/PrintEventPE.java | 31 +
.../org/apache/s4/processor/PrototypeWrapper.java | 129 +++
.../java/org/apache/s4/processor/ReroutePE.java | 104 ++
.../org/apache/s4/processor/SimpleCountingPE.java | 92 ++
.../src/main/java/org/apache/s4/schema/Schema.java | 293 ++++++
.../java/org/apache/s4/schema/SchemaContainer.java | 37 +
.../java/org/apache/s4/schema/SchemaManager.java | 92 ++
.../java/org/apache/s4/serialize/KryoSerDeser.java | 75 ++
.../s4/serialize/SerializerDeserializer.java | 22 +
.../apache/s4/test/TestPersisterEventClock.java | 152 +++
.../org/apache/s4/test/TestPersisterWallClock.java | 122 +++
.../org/apache/s4/util/ByteArrayIOChannel.java | 94 ++
.../src/main/java/org/apache/s4/util/Cloner.java | 20 +
.../java/org/apache/s4/util/ClonerGenerator.java | 149 +++
.../org/apache/s4/util/DoubleOutputFormatter.java | 44 +
.../src/main/java/org/apache/s4/util/GsonUtil.java | 74 ++
.../src/main/java/org/apache/s4/util/KeyUtil.java | 59 ++
.../java/org/apache/s4/util/LoadGenerator.java | 640 ++++++++++++
.../java/org/apache/s4/util/MethodInvoker.java | 124 +++
.../main/java/org/apache/s4/util/MetricsName.java | 51 +
.../java/org/apache/s4/util/MiscConstants.java | 20 +
.../main/java/org/apache/s4/util/NumberUtils.java | 50 +
.../java/org/apache/s4/util/PreprodLogger.java | 59 ++
.../s4/util/ReverseDoubleOutputFormatter.java | 28 +
.../s4/util/ReverseIntegerOutputFormatter.java | 28 +
.../src/main/java/org/apache/s4/util/S4Util.java | 25 +
.../main/java/org/apache/s4/util/SlotUtils.java | 60 ++
.../apache/s4/util/ToStringOutputFormatter.java | 26 +
.../src/main/java/org/apache/s4/util/Watcher.java | 174 ++++
.../main/java/org/apache/s4/util/clock/Clock.java | 25 +
.../apache/s4/util/clock/ClockStreamsLoader.java | 47 +
.../java/org/apache/s4/util/clock/DrivenClock.java | 87 ++
.../java/org/apache/s4/util/clock/EventClock.java | 74 ++
.../org/apache/s4/util/clock/TimerRequest.java | 46 +
.../java/org/apache/s4/util/clock/WallClock.java | 41 +
224 files changed, 12626 insertions(+), 12626 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/MainApp.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/MainApp.java b/s4-core/src/main/java/io/s4/MainApp.java
deleted file mode 100644
index 78c195b..0000000
--- a/s4-core/src/main/java/io/s4/MainApp.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4;
-
-import io.s4.ft.SafeKeeper;
-import io.s4.processor.AbstractPE;
-import io.s4.processor.PEContainer;
-import io.s4.util.S4Util;
-import io.s4.util.Watcher;
-import io.s4.util.clock.Clock;
-import io.s4.util.clock.EventClock;
-
-import java.io.File;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.FileSystemXmlApplicationContext;
-import org.springframework.core.io.ClassPathResource;
-
-
-public class MainApp {
-
- private static String coreHome = "../s4-core";
- private static String appsHome = "../s4-apps";
- private static String extsHome = "../s4-exts";
-
- public static void main(String args[]) throws Exception {
- Options options = new Options();
-
- options.addOption(OptionBuilder.withArgName("corehome")
- .hasArg()
- .withDescription("core home")
- .create("c"));
-
- options.addOption(OptionBuilder.withArgName("appshome")
- .hasArg()
- .withDescription("applications home")
- .create("a"));
-
- options.addOption(OptionBuilder.withArgName("s4clock")
- .hasArg()
- .withDescription("s4 clock")
- .create("d"));
-
- options.addOption(OptionBuilder.withArgName("seedtime")
- .hasArg()
- .withDescription("event clock initialization time")
- .create("s"));
-
- options.addOption(OptionBuilder.withArgName("extshome")
- .hasArg()
- .withDescription("extensions home")
- .create("e"));
-
- options.addOption(OptionBuilder.withArgName("instanceid")
- .hasArg()
- .withDescription("instance id")
- .create("i"));
-
- options.addOption(OptionBuilder.withArgName("configtype")
- .hasArg()
- .withDescription("configuration type")
- .create("t"));
-
- CommandLineParser parser = new GnuParser();
- CommandLine commandLine = null;
- String clockType = "wall";
-
- try {
- commandLine = parser.parse(options, args);
- } catch (ParseException pe) {
- System.err.println(pe.getLocalizedMessage());
- System.exit(1);
- }
-
- int instanceId = -1;
- if (commandLine.hasOption("i")) {
- String instanceIdStr = commandLine.getOptionValue("i");
- try {
- instanceId = Integer.parseInt(instanceIdStr);
- } catch (NumberFormatException nfe) {
- System.err.println("Bad instance id: %s" + instanceIdStr);
- System.exit(1);
- }
- }
-
- if (commandLine.hasOption("c")) {
- coreHome = commandLine.getOptionValue("c");
- }
-
- if (commandLine.hasOption("a")) {
- appsHome = commandLine.getOptionValue("a");
- }
-
- if (commandLine.hasOption("d")) {
- clockType = commandLine.getOptionValue("d");
- }
-
- if (commandLine.hasOption("e")) {
- extsHome = commandLine.getOptionValue("e");
- }
-
- String configType = "typical";
- if (commandLine.hasOption("t")) {
- configType = commandLine.getOptionValue("t");
- }
-
- long seedTime = 0;
- if (commandLine.hasOption("s")) {
- seedTime = Long.parseLong(commandLine.getOptionValue("s"));
- }
-
- File coreHomeFile = new File(coreHome);
- if (!coreHomeFile.isDirectory()) {
- System.err.println("Bad core home: " + coreHome);
- System.exit(1);
- }
-
- File appsHomeFile = new File(appsHome);
- if (!appsHomeFile.isDirectory()) {
- System.err.println("Bad applications home: " + appsHome);
- System.exit(1);
- }
-
- if (instanceId > -1) {
- System.setProperty("instanceId", "" + instanceId);
- } else {
- System.setProperty("instanceId", "" + S4Util.getPID());
- }
-
- List loArgs = commandLine.getArgList();
-
- if (loArgs.size() < 1) {
- // System.err.println("No bean configuration file specified");
- // System.exit(1);
- }
-
- // String s4ConfigXml = (String) loArgs.get(0);
- // System.out.println("s4ConfigXml is " + s4ConfigXml);
-
- ClassPathResource propResource = new ClassPathResource("s4-core.properties");
- Properties prop = new Properties();
- if (propResource.exists()) {
- prop.load(propResource.getInputStream());
- } else {
- System.err.println("Unable to find s4-core.properties. It must be available in classpath");
- System.exit(1);
- }
-
- ApplicationContext coreContext = null;
- String configBase = coreHome + File.separatorChar + "conf"
- + File.separatorChar + configType;
- String configPath = "";
- List<String> coreConfigUrls = new ArrayList<String>();
- File configFile = null;
-
- // load clock configuration
- configPath = configBase + File.separatorChar + clockType + "-clock.xml";
- coreConfigUrls.add(configPath);
-
- // load core config xml
- configPath = configBase + File.separatorChar + "s4-core-conf.xml";
- configFile = new File(configPath);
- if (!configFile.exists()) {
- System.err.printf("S4 core config file %s does not exist\n",
- configPath);
- System.exit(1);
- }
-
- coreConfigUrls.add(configPath);
- String[] coreConfigFiles = new String[coreConfigUrls.size()];
- coreConfigUrls.toArray(coreConfigFiles);
-
- String[] coreConfigFileUrls = new String[coreConfigFiles.length];
- for (int i = 0; i < coreConfigFiles.length; i++) {
- coreConfigFileUrls[i] = "file:" + coreConfigFiles[i];
- }
-
- coreContext = new FileSystemXmlApplicationContext(coreConfigFileUrls, coreContext);
- ApplicationContext context = coreContext;
-
- Clock clock = (Clock) context.getBean("clock");
- if (clock instanceof EventClock && seedTime > 0) {
- EventClock s4EventClock = (EventClock)clock;
- s4EventClock.updateTime(seedTime);
- System.out.println("Intializing event clock time with seed time " + s4EventClock.getCurrentTime());
- }
-
- PEContainer peContainer = (PEContainer) context.getBean("peContainer");
-
- Watcher w = (Watcher) context.getBean("watcher");
- w.setConfigFilename(configPath);
-
-
- // load extension modules
- String[] configFileNames = getModuleConfigFiles(extsHome, prop);
- if (configFileNames.length > 0) {
- String[] configFileUrls = new String[configFileNames.length];
- for (int i = 0; i < configFileNames.length; i++) {
- configFileUrls[i] = "file:" + configFileNames[i];
- }
- context = new FileSystemXmlApplicationContext(configFileUrls,
- context);
- }
-
- // load application modules
- configFileNames = getModuleConfigFiles(appsHome, prop);
- if (configFileNames.length > 0) {
- String[] configFileUrls = new String[configFileNames.length];
- for (int i = 0; i < configFileNames.length; i++) {
- configFileUrls[i] = "file:" + configFileNames[i];
- }
- context = new FileSystemXmlApplicationContext(configFileUrls,
- context);
- // attach any beans that implement ProcessingElement to the PE
- // Container
- String[] processingElementBeanNames = context.getBeanNamesForType(AbstractPE.class);
- for (String processingElementBeanName : processingElementBeanNames) {
- AbstractPE bean = (AbstractPE) context.getBean(processingElementBeanName);
- bean.setClock(clock);
- try {
- bean.setSafeKeeper((SafeKeeper) context.getBean("safeKeeper"));
- } catch (NoSuchBeanDefinitionException ignored) {
- // no safe keeper = no checkpointing / recovery
- }
- // if the application did not specify an id, use the Spring bean name
- if (bean.getId() == null) {
- bean.setId(processingElementBeanName);
- }
- System.out.println("Adding processing element with bean name "
- + processingElementBeanName + ", id "
- + ((AbstractPE) bean).getId());
- peContainer.addProcessor((AbstractPE) bean);
- }
- }
- }
-
- /**
- *
- * @param prop
- * @return
- */
- private static String[] getModuleConfigFiles(String moduleBase, Properties prop) {
- List<String> configFileList = new ArrayList<String>();
- File moduleBaseFile = new File(moduleBase);
-
- // list applications
- File[] moduleDirs = moduleBaseFile.listFiles();
- for (File moduleDir : moduleDirs) {
- if (moduleDir.isDirectory()) {
- String confFileName = moduleDir.getAbsolutePath() + "/"
- + moduleDir.getName() + "-conf.xml";
- File appsConfFile = new File(confFileName);
- if (appsConfFile.exists()) {
- configFileList.add(appsConfFile.getAbsolutePath());
- } else {
- System.err.println("Invalid application: " + moduleDir);
- }
- }
- }
- String[] ret = new String[configFileList.size()];
- configFileList.toArray(ret);
- System.out.println(Arrays.toString(ret));
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/adapter/Adapter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/adapter/Adapter.java b/s4-core/src/main/java/io/s4/adapter/Adapter.java
deleted file mode 100644
index c31ee97..0000000
--- a/s4-core/src/main/java/io/s4/adapter/Adapter.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.adapter;
-
-import io.s4.collector.EventWrapper;
-import io.s4.dispatcher.EventDispatcher;
-import io.s4.listener.EventHandler;
-import io.s4.listener.EventListener;
-import io.s4.listener.EventProducer;
-import io.s4.util.S4Util;
-
-import java.io.File;
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.log4j.Logger;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.FileSystemXmlApplicationContext;
-
-public class Adapter implements EventHandler {
- private static String coreHome = "../s4_core";
-
- private EventDispatcher dispatcher;
- private EventProducer[] eventListeners;
- private String configFilename;
-
- public void setDispatcher(EventDispatcher dispatcher) {
- this.dispatcher = dispatcher;
- }
-
- public void setEventListeners(EventProducer[] eventListeners) {
- this.eventListeners = eventListeners;
- for (EventProducer eventListener : eventListeners) {
- eventListener.addHandler(this);
- }
- }
-
- public void setConfigFilename(String configFilename) {
- this.configFilename = configFilename;
- }
-
- private volatile int eventCount = 0;
- private volatile int rawEventCount = 0;
-
- public Adapter() {
-
- }
-
- int counts[];
-
- private boolean init = false;
-
- public void init() {
- synchronized (this) {
- init = true;
- }
- }
-
- public void processEvent(EventWrapper eventWrapper) {
- try {
- synchronized (this) {
- if (!init) {
- return;
- }
- rawEventCount++;
- eventCount++;
- }
- dispatcher.dispatchEvent(eventWrapper.getStreamName(),
- eventWrapper.getEvent());
- } catch (Exception e) {
- Logger.getLogger("dispatcher").info("Exception adapting event", e);
- }
- }
-
- public static void main(String args[]) {
- Options options = new Options();
-
- options.addOption(OptionBuilder.withArgName("corehome")
- .hasArg()
- .withDescription("core home")
- .create("c"));
-
- options.addOption(OptionBuilder.withArgName("instanceid")
- .hasArg()
- .withDescription("instance id")
- .create("i"));
-
- options.addOption(OptionBuilder.withArgName("configtype")
- .hasArg()
- .withDescription("configuration type")
- .create("t"));
-
- options.addOption(OptionBuilder.withArgName("userconfig")
- .hasArg()
- .withDescription("user-defined legacy data adapter configuration file")
- .create("d"));
-
- CommandLineParser parser = new GnuParser();
- CommandLine commandLine = null;
-
- try {
- commandLine = parser.parse(options, args);
- } catch (ParseException pe) {
- System.err.println(pe.getLocalizedMessage());
- System.exit(1);
- }
-
- int instanceId = -1;
- if (commandLine.hasOption("i")) {
- String instanceIdStr = commandLine.getOptionValue("i");
- try {
- instanceId = Integer.parseInt(instanceIdStr);
- } catch (NumberFormatException nfe) {
- System.err.println("Bad instance id: %s" + instanceIdStr);
- System.exit(1);
- }
- }
-
- if (commandLine.hasOption("c")) {
- coreHome = commandLine.getOptionValue("c");
- }
-
- String configType = "typical";
- if (commandLine.hasOption("t")) {
- configType = commandLine.getOptionValue("t");
- }
-
- String userConfigFilename = null;
- if (commandLine.hasOption("d")) {
- userConfigFilename = commandLine.getOptionValue("d");
- }
-
- File userConfigFile = new File(userConfigFilename);
- if (!userConfigFile.isFile()) {
- System.err.println("Bad user configuration file: "
- + userConfigFilename);
- System.exit(1);
- }
-
- File coreHomeFile = new File(coreHome);
- if (!coreHomeFile.isDirectory()) {
- System.err.println("Bad core home: " + coreHome);
- System.exit(1);
- }
-
- if (instanceId > -1) {
- System.setProperty("instanceId", "" + instanceId);
- } else {
- System.setProperty("instanceId", "" + S4Util.getPID());
- }
-
- String configBase = coreHome + File.separatorChar + "conf"
- + File.separatorChar + configType;
- String configPath = configBase + File.separatorChar
- + "adapter-conf.xml";
- File configFile = new File(configPath);
- if (!configFile.exists()) {
- System.err.printf("adapter config file %s does not exist\n",
- configPath);
- System.exit(1);
- }
-
- // load adapter config xml
- ApplicationContext coreContext;
- coreContext = new FileSystemXmlApplicationContext("file:" + configPath);
- ApplicationContext context = coreContext;
-
- Adapter adapter = (Adapter) context.getBean("adapter");
-
- ApplicationContext appContext = new FileSystemXmlApplicationContext(new String[] { "file:"
- + userConfigFilename },
- context);
- Map listenerBeanMap = appContext.getBeansOfType(EventProducer.class);
- if (listenerBeanMap.size() == 0) {
- System.err.println("No user-defined listener beans");
- System.exit(1);
- }
- EventProducer[] eventListeners = new EventProducer[listenerBeanMap.size()];
-
- int index = 0;
- for (Iterator it = listenerBeanMap.keySet().iterator(); it.hasNext(); index++) {
- String beanName = (String) it.next();
- System.out.println("Adding producer " + beanName);
- eventListeners[index] = (EventProducer) listenerBeanMap.get(beanName);
- }
-
- adapter.setEventListeners(eventListeners);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/Adapter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/Adapter.java b/s4-core/src/main/java/io/s4/client/Adapter.java
deleted file mode 100644
index fb9c770..0000000
--- a/s4-core/src/main/java/io/s4/client/Adapter.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.client;
-
-import io.s4.collector.EventListener;
-import io.s4.collector.EventWrapper;
-import io.s4.dispatcher.EventDispatcher;
-import io.s4.listener.EventHandler;
-import io.s4.message.Request;
-import io.s4.processor.AsynchronousEventProcessor;
-import io.s4.util.S4Util;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.Logger;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.support.FileSystemXmlApplicationContext;
-
-/**
- * Adapter to connect an S4 cluster to an external client to read from and write
- * into the S4 cluster.
- */
-public class Adapter {
- private static String coreHome = "../s4_core";
-
- // Accept events from clients and send into S4 cluster.
- private EventDispatcher dispatcher;
- private Writer eventWriter = new Writer();
-
- // Listen to events from S4 cluster and send to clients.
- private io.s4.collector.EventListener clusterEventListener;
- private Reader eventReader = new Reader();
-
- /**
- * Set the dispatcher to use for sending events into S4.
- *
- * @param dispatcher
- */
- public void setDispatcher(EventDispatcher dispatcher) {
- this.dispatcher = dispatcher;
- }
-
- /**
- * clusterEventListener receives events from S4 cluster. Processing of
- * events is delegated to the eventReader, which typically forwards them to
- * appropriate clients.
- *
- * @param clusterEventListener
- */
- public void setClusterEventListener(EventListener clusterEventListener) {
- this.clusterEventListener = clusterEventListener;
- this.clusterEventListener.setEventProcessor(eventReader);
- }
-
- public Adapter() {
- }
-
- int counts[];
-
- private boolean init = false;
-
- public void init() {
- synchronized (this) {
- init = true;
- }
- }
-
- /**
- * Register a list of InputStubs that will send events into the S4 cluster.
- * These events will be processed by the eventWriter: i.e. the eventWriter
- * dispatches them into the S4 cluster.
- *
- * @param stubs
- */
- public void setInputStubs(List<InputStub> stubs) {
- for (InputStub stub : stubs) {
- // register the writer as the handler for the stub's events
- stub.addHandler(eventWriter);
- }
- }
-
- // collections of eventReceivers (OutputStubs) to which events will be sent
- // for forwarding to clients.
- HashMap<String, List<OutputStub>> eventReceivers = new HashMap<String, List<OutputStub>>();
- List<OutputStub> eventReceiversAny = new ArrayList<OutputStub>();
- List<OutputStub> eventReceiversAll = new ArrayList<OutputStub>();
-
- /**
- * Register a list of OutputStubs which process events received from the S4
- * cluster, typically forwarding them to clients.
- *
- * The {@link OutputStub#getAcceptedStreams()} is called to determine which
- * streams the stub is interested in receiving. Accordingly, three
- * collections of stubs are created.
- *
- * <ol>
- * <li>A mapping from stream name to OutputStubs (One-to-many). Used for
- * routing.</li>
- * <li>A list of OutputStubs that accept all streams (
- * {@code stub.getAcceptedStreams() == null}. Used for routing.</li>
- * <li>A list of all OutputStubs. This is used to iterate over all the stubs
- * exactly once.</li>
- * </ol>
- *
- * @param stubs
- * the list of output stubs.
- */
- public void setOutputStubs(List<OutputStub> stubs) {
- eventReceiversAll.addAll(stubs);
-
- for (OutputStub stub : stubs) {
- // update mapping of stream names to stubs that accept events on
- // that stream.
- List<String> streams = stub.getAcceptedStreams();
- if (streams != null) {
- for (String stream : streams) {
- List<OutputStub> stubList = eventReceivers.get(stream);
- if (stubList == null) {
- stubList = new ArrayList<OutputStub>();
- eventReceivers.put(stream, stubList);
- }
-
- stubList.add(stub);
- }
- } else {
- eventReceiversAny.add(stub);
- }
- }
- }
-
- /**
- * Write events from input stubs into S4 cluster.
- */
- private class Writer implements EventHandler {
- // events to be dispatched into cluster
- public void processEvent(EventWrapper eventWrapper) {
- try {
- synchronized (this) {
- if (!init) {
- return;
- }
- rawEventCount++;
- eventCount++;
- }
-
- // null keys => round-robin
- // empty key list => default partitioning of underlying
- // partitioner.
- List<List<String>> keys = eventWrapper.getCompoundKeyNames();
-
- String stream = eventWrapper.getStreamName();
-
- Object event = eventWrapper.getEvent();
-
- if (event instanceof Request)
- decorateRequest((Request) event);
-
- dispatcher.dispatchEvent(stream, keys, event);
-
- } catch (Exception e) {
- Logger.getLogger("adapter").info("Exception adapting event", e);
- }
- }
-
- private volatile int eventCount = 0;
- private volatile int rawEventCount = 0;
-
- // set the return stream name to the adapter cluster name
- private void decorateRequest(Request r) {
- Request.RInfo rinfo = r.getRInfo();
-
- if (rinfo instanceof Request.ClientRInfo) {
- String cname = clusterEventListener.getRawListener()
- .getAppName();
-
- ((Request.ClientRInfo) rinfo).setStream("@" + cname);
- }
- }
- }
-
- /**
- * Read events from S4 cluster.
- */
- private class Reader implements AsynchronousEventProcessor {
- /**
- * Queue work for processing by OutputStubs. This method simply queues
- * the events in the appropriate stubs. An event from stream K is queued
- * in OutputStub S if either S accepts all streams, or K is contained in
- * the list {@code S.getAcceptedStreams()}.
- */
- @Override
- public void queueWork(EventWrapper eventWrapper) {
- List<OutputStub> stubs = eventReceivers.get(eventWrapper.getStreamName());
-
- // stubs that accept any stream
- for (OutputStub stub : eventReceiversAny)
- stub.queueWork(eventWrapper);
-
- // stubs that receive this stream in particular
- if (stubs != null)
- for (OutputStub stub : stubs)
- stub.queueWork(eventWrapper);
- }
-
- @Override
- public int getQueueSize() {
- int sz = 0;
-
- for (OutputStub stub : eventReceiversAll)
- sz += stub.getQueueSize();
-
- return sz;
- }
-
- }
-
- private static class TestDispatcher implements EventDispatcher {
- @Override
- public void dispatchEvent(String s, Object e) {
- System.out.println("Dispatching event: " + s + ":" + e);
- }
-
- @Override
- public void dispatchEvent(String s, List<List<String>> k, Object e) {
- System.out.println("Dispatching event: " + s + ":" + k + ":" + e);
- }
- }
-
- private static class TestReturnType {
- private int ra;
- private int rb;
-
- @SuppressWarnings("unused")
- TestReturnType() {
- }
-
- TestReturnType(int a, int b) {
- this.ra = a;
- this.rb = b;
- }
-
- public String toString() {
- return "ra=" + ra + " rb=" + rb;
- }
- }
-
- @SuppressWarnings("static-access")
- public static void main(String args[]) throws IOException,
- InterruptedException {
-
- Options options = new Options();
-
- options.addOption(OptionBuilder.withArgName("corehome")
- .hasArg()
- .withDescription("core home")
- .create("c"));
-
- options.addOption(OptionBuilder.withArgName("instanceid")
- .hasArg()
- .withDescription("instance id")
- .create("i"));
-
- options.addOption(OptionBuilder.withArgName("configtype")
- .hasArg()
- .withDescription("configuration type")
- .create("t"));
-
- options.addOption(OptionBuilder.withArgName("userconfig")
- .hasArg()
- .withDescription("user-defined legacy data adapter configuration file")
- .create("d"));
-
- CommandLineParser parser = new GnuParser();
- CommandLine commandLine = null;
-
- try {
- commandLine = parser.parse(options, args);
- } catch (ParseException pe) {
- System.err.println(pe.getLocalizedMessage());
- System.exit(1);
- }
-
- int instanceId = -1;
- if (commandLine.hasOption("i")) {
- String instanceIdStr = commandLine.getOptionValue("i");
- try {
- instanceId = Integer.parseInt(instanceIdStr);
- } catch (NumberFormatException nfe) {
- System.err.println("Bad instance id: %s" + instanceIdStr);
- System.exit(1);
- }
- }
-
- if (commandLine.hasOption("c")) {
- coreHome = commandLine.getOptionValue("c");
- }
-
- String configType = "typical";
- if (commandLine.hasOption("t")) {
- configType = commandLine.getOptionValue("t");
- }
-
- String userConfigFilename = null;
- if (commandLine.hasOption("d")) {
- userConfigFilename = commandLine.getOptionValue("d");
- }
-
- File userConfigFile = new File(userConfigFilename);
- if (!userConfigFile.isFile()) {
- System.err.println("Bad user configuration file: "
- + userConfigFilename);
- System.exit(1);
- }
-
- File coreHomeFile = new File(coreHome);
- if (!coreHomeFile.isDirectory()) {
- System.err.println("Bad core home: " + coreHome);
- System.exit(1);
- }
-
- if (instanceId > -1) {
- System.setProperty("instanceId", "" + instanceId);
- } else {
- System.setProperty("instanceId", "" + S4Util.getPID());
- }
-
- String configBase = coreHome + File.separatorChar + "conf"
- + File.separatorChar + configType;
- String configPath = configBase + File.separatorChar
- + "client-adapter-conf.xml";
- File configFile = new File(configPath);
- if (!configFile.exists()) {
- System.err.printf("adapter config file %s does not exist\n",
- configPath);
- System.exit(1);
- }
-
- // load adapter config xml
- ApplicationContext coreContext;
- coreContext = new FileSystemXmlApplicationContext("file:" + configPath);
- ApplicationContext context = coreContext;
-
- Adapter adapter = (Adapter) context.getBean("client_adapter");
-
- ApplicationContext appContext = new FileSystemXmlApplicationContext(new String[] { "file:"
- + userConfigFilename },
- context);
-
- Map<?, ?> inputStubBeanMap = appContext.getBeansOfType(InputStub.class);
- Map<?, ?> outputStubBeanMap = appContext.getBeansOfType(OutputStub.class);
-
- if (inputStubBeanMap.size() == 0 && outputStubBeanMap.size() == 0) {
- System.err.println("No user-defined input/output stub beans");
- System.exit(1);
- }
-
- ArrayList<InputStub> inputStubs = new ArrayList<InputStub>(inputStubBeanMap.size());
- ArrayList<OutputStub> outputStubs = new ArrayList<OutputStub>(outputStubBeanMap.size());
-
- // add all input stubs
- for (Map.Entry<?, ?> e : inputStubBeanMap.entrySet()) {
- String beanName = (String) e.getKey();
- System.out.println("Adding InputStub " + beanName);
- inputStubs.add((InputStub) e.getValue());
- }
-
- // add all output stubs
- for (Map.Entry<?, ?> e : outputStubBeanMap.entrySet()) {
- String beanName = (String) e.getKey();
- System.out.println("Adding OutputStub " + beanName);
- outputStubs.add((OutputStub) e.getValue());
- }
-
- adapter.setInputStubs(inputStubs);
- adapter.setOutputStubs(outputStubs);
-
- }
-
- public static void clientTest() throws IOException, InterruptedException {
- BasicConfigurator.configure();
-
- TestDispatcher disp = new TestDispatcher();
-
- Adapter adapter = new Adapter();
- adapter.setDispatcher(disp);
-
- GenericJsonClientStub stub = new GenericJsonClientStub();
- stub.setConnectionPort(2334);
-
- InputStub[] in = { stub };
- OutputStub[] out = { stub };
- adapter.setInputStubs(Arrays.asList(in));
- adapter.setOutputStubs(Arrays.asList(out));
-
- adapter.init();
- stub.init();
-
- while (true) {
- Thread.sleep(10000);
- TestReturnType r = new TestReturnType(100, 200);
- adapter.eventReader.queueWork(new EventWrapper("TESTSTREAM",
- r,
- null));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/ClientConnection.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/ClientConnection.java b/s4-core/src/main/java/io/s4/client/ClientConnection.java
deleted file mode 100644
index fce2353..0000000
--- a/s4-core/src/main/java/io/s4/client/ClientConnection.java
+++ /dev/null
@@ -1,163 +0,0 @@
-package io.s4.client;
-
-import io.s4.collector.EventWrapper;
-import io.s4.message.Request;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.util.HashSet;
-import java.util.List;
-import java.util.UUID;
-
-/**
- * Connection to a client. A Stub has a collection of connections.
- */
-public class ClientConnection {
- /**
- *
- */
- private final ClientStub clientStub;
-
- /**
- * TCP/IP socket used to communicate with client.
- */
- private final Socket socket;
-
- public final IOChannel io;
-
- public final ClientReadMode clientReadMode;
- public final ClientWriteMode clientWriteMode;
-
- /**
- * GUID of client.
- */
- public final UUID uuid;
-
- public ClientConnection(ClientStub clientStub, Socket socket, UUID uuid,
- ClientReadMode clientReadMode, ClientWriteMode clientWriteMode)
- throws IOException {
- this.clientStub = clientStub;
- this.uuid = uuid;
- this.socket = socket;
- this.io = this.clientStub.createIOChannel(socket);
- this.clientReadMode = clientReadMode;
- this.clientWriteMode = clientWriteMode;
- }
-
- public boolean good() {
- return socket.isConnected();
- }
-
- public void close() {
- synchronized (this.clientStub.clients) {
- ClientStub.logger.info("closing connection to client " + uuid);
- this.clientStub.clients.remove(this.uuid);
- }
-
- try {
- socket.close();
- } catch (IOException e) {
- ClientStub.logger.error("problem closing client connection to client "
- + uuid,
- e);
- }
- }
-
- private HashSet<String> includeStreams = new HashSet<String>();
- private HashSet<String> excludeStreams = new HashSet<String>();
-
- public void includeStreams(List<String> streams) {
- includeStreams.addAll(streams);
- }
-
- public void excludeStreams(List<String> streams) {
- excludeStreams.addAll(streams);
- }
-
- /**
- * Stream is accepted if and only if:
- *
- * (A) readMode is Select and: 1. indifferent to stream inclusion OR stream
- * is included AND 2. indifferent to stream exclusion OR stream is not
- * excluded.
- *
- * OR (B) readMode is All
- *
- * @return true if and only if stream is accepted
- */
- public boolean streamAccepted(String s) {
- switch (clientReadMode) {
- case None:
- case Private:
- return false;
-
- case Select:
- return (includeStreams.isEmpty() || includeStreams.contains(s))
- && (excludeStreams.isEmpty() || !excludeStreams.contains(s));
-
- case All:
- return true;
-
- default:
- return false;
- }
- }
-
- private Thread receiverThread = null;
-
- public void start() {
- if (clientWriteMode == ClientWriteMode.Enabled)
- (receiverThread = new Thread(receiver)).start();
- }
-
- public void stop() {
- if (receiverThread != null) {
- receiverThread.interrupt();
- receiverThread = null;
- }
- }
-
- public final Runnable receiver = new Runnable() {
- public void run() {
- try {
- while (good()) {
- byte[] b = io.recv();
-
- // null, empty => goodbye
- if (b == null || b.length == 0) {
- ClientStub.logger.info("client session ended " + uuid);
- break;
- }
-
- EventWrapper ew = ClientConnection.this.clientStub.eventWrapperFromBytes(b);
- if (ew == null)
- continue;
-
- Object event = ew.getEvent();
- if (event instanceof Request) {
- decorateRequest((Request) event);
- ClientStub.logger.info("Decorated client request: "
- + ew.toString());
- }
-
- ClientConnection.this.clientStub.injectEvent(ew);
- }
- } catch (IOException e) {
- ClientStub.logger.info("error while reading from client "
- + uuid, e);
-
- } finally {
- close();
- }
- }
-
- private void decorateRequest(Request r) {
- // add UUID of client into request.
- Request.RInfo info = r.getRInfo();
-
- if (info != null && info instanceof Request.ClientRInfo)
- ((Request.ClientRInfo) info).setRequesterUUID(ClientConnection.this.uuid);
- }
- };
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/ClientReadMode.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/ClientReadMode.java b/s4-core/src/main/java/io/s4/client/ClientReadMode.java
deleted file mode 100644
index cf374a1..0000000
--- a/s4-core/src/main/java/io/s4/client/ClientReadMode.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package io.s4.client;
-
-/**
- * Client mode.
- */
-public enum ClientReadMode {
- None(false, false), Private(true, false), Select(true, true), All(true, true);
-
- private final boolean priv;
- private final boolean pub;
-
- ClientReadMode(boolean priv, boolean pub) {
- this.priv = priv;
- this.pub = pub;
- }
-
- public boolean takePublic() {
- return pub;
- }
-
- public boolean takePrivate() {
- return priv;
- }
-
- public static ClientReadMode fromString(String s) {
- if (s.equalsIgnoreCase("none"))
- return None;
- else if (s.equalsIgnoreCase("private"))
- return Private;
- else if (s.equalsIgnoreCase("select"))
- return Select;
- else if (s.equalsIgnoreCase("all"))
- return All;
- else
- return null;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/ClientStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/ClientStub.java b/s4-core/src/main/java/io/s4/client/ClientStub.java
deleted file mode 100644
index 65b9804..0000000
--- a/s4-core/src/main/java/io/s4/client/ClientStub.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.client;
-
-import io.s4.collector.EventWrapper;
-import io.s4.listener.EventHandler;
-import io.s4.message.Request;
-import io.s4.message.Response;
-import io.s4.util.ByteArrayIOChannel;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.log4j.Logger;
-
-public abstract class ClientStub implements OutputStub, InputStub {
-
- protected static final Logger logger = Logger.getLogger("adapter");
-
- /**
- * Description of the protocol implemented by a concrete instance of this
- * stub.
- */
- public static class Info {
- public final String name;
- public final int versionMajor;
- public final int versionMinor;
-
- public Info(String name, int versionMajor, int versionMinor) {
- this.name = name;
- this.versionMajor = versionMajor;
- this.versionMinor = versionMinor;
- }
- }
-
- /**
- * Meta-information about the protocol that this stub uses to talk to
- * external clients.
- *
- * This is sent to the client as a part of the handshake.
- */
- abstract public Info getProtocolInfo();
-
- /**
- * Stream names that are accepted by this stub to be forwarded to its
- * clients.
- */
- @Override
- public List<String> getAcceptedStreams() {
- return null;
- }
-
- private List<EventHandler> handlers = new ArrayList<EventHandler>();
-
- /**
- * A handler that can inject events produced by this stub into the S4
- * cluster.
- */
- @Override
- public void addHandler(EventHandler handler) {
- this.handlers.add(handler);
- }
-
- /**
- * Remove a handler.
- */
- @Override
- public boolean removeHandler(EventHandler handler) {
- return handlers.remove(handler);
- }
-
- /**
- * Convert an array of bytes into an event wrapper. This method is used to
- * translate data received from a client into events that may be injected
- * into the S4 cluster.
- *
- * @param v
- * array of bytes
- * @return EventWrapper constructed from the byte array.
- */
- abstract public EventWrapper eventWrapperFromBytes(byte[] v);
-
- /**
- * Convert an event wrapper into a byte array. Events received from the S4
- * cluster for dispatching to a client are translated into a byte array
- * using this method.
- *
- * @param e
- * an {@link EventWrapper}
- * @return a byte array
- */
- abstract public byte[] bytesFromEventWrapper(EventWrapper e);
-
- /**
- * Construct an I/O channel over which the stub can communicate with a
- * client. The channel allows arrys of bytes to be exchanged between the
- * stub and client.
- *
- * @param socket
- * TCP/IP socket
- * @return an IO Channel to send and recv byte arrays
- * @throws IOException
- * if the underlying socket could not provide valid input and
- * output streams.
- */
- public IOChannel createIOChannel(Socket socket) throws IOException {
- return new ByteArrayIOChannel(socket);
- }
-
- // send an event into the cluster via adapter.
- void injectEvent(EventWrapper e) {
- for (EventHandler handler : handlers) {
- handler.processEvent(e);
- }
- }
-
- // private List<ClientConnection> clients = new
- // ArrayList<ClientConnection>();
- HashMap<UUID, ClientConnection> clients = new HashMap<UUID, ClientConnection>();
-
- /**
- * Create a client connection and add it to list of clients.
- *
- * @param socket
- * client's I/O socket
- */
- private void addClient(ClientConnection c) {
- synchronized (clients) {
- logger.info("adding client " + c.uuid);
- clients.put(c.uuid, c);
- }
- }
-
- LinkedBlockingQueue<EventWrapper> queue = new LinkedBlockingQueue<EventWrapper>();
-
- @Override
- public int getQueueSize() {
- return queue.size();
- }
-
- @Override
- public void queueWork(EventWrapper e) {
- queue.offer(e);
- }
-
- ServerSocket serverSocket = null;
-
- public void setConnectionPort(int port) throws IOException {
- serverSocket = new ServerSocket(port);
- }
-
- private Thread acceptThread = null;
- private Thread senderThread = null;
-
- public void init() {
- // start accepting new clients and sending events to them
- (acceptThread = new Thread(connectionListener)).start();
- (senderThread = new Thread(sender)).start();
- }
-
- public void shutdown() {
- // stop accepting new clients
- if (acceptThread != null) {
- acceptThread.interrupt();
- acceptThread = null;
- }
-
- // stop sending events to them.
- if (senderThread != null) {
- senderThread.interrupt();
- senderThread = null;
- }
-
- // stop all connected clients.
- List<ClientConnection> clientCopy = new ArrayList<ClientConnection>(clients.values());
- for (ClientConnection c : clientCopy) {
- c.stop();
- c.close();
- }
- }
-
- private final Runnable connectionListener = new Runnable() {
-
- Handshake handshake = null;
-
- public void run() {
- if (handshake == null)
- handshake = new Handshake(ClientStub.this);
-
- try {
- while (serverSocket != null && serverSocket.isBound()
- && !Thread.currentThread().isInterrupted()) {
-
- Socket socket = serverSocket.accept();
-
- ClientConnection connection = handshake.execute(socket);
-
- if (connection != null) {
- addClient(connection);
- connection.start();
- }
-
- }
- } catch (IOException e) {
- logger.info("exception in client connection listener", e);
- }
- }
-
- };
-
- public final Runnable sender = new Runnable() {
- ArrayList<ClientConnection> disconnect = new ArrayList<ClientConnection>();
-
- public void run() {
-
- while (!Thread.currentThread().isInterrupted()) {
- try {
- EventWrapper event = queue.take();
-
- // Responses need special handling.
- if (event.getEvent() instanceof Response) {
- dispatchResponse(event);
- continue;
- }
-
- // TODO: include check to see if the event belongs to a
- // particular client.
-
- dispatchToAllClients(event);
-
- } catch (InterruptedException e) {
- return;
- }
- }
- }
-
- private void dispatchToAllClients(EventWrapper event) {
-
- byte[] b = bytesFromEventWrapper(event);
- String stream = event.getStreamName();
-
- synchronized (clients) {
- for (ClientConnection c : clients.values()) {
- if (c.good() && c.streamAccepted(stream)) {
- try {
- c.io.send(b);
-
- } catch (IOException e) {
- logger.error("error sending message to client "
- + c.uuid + ". disconnecting", e);
-
- disconnect.add(c);
- }
- }
- }
- }
-
- if (disconnect.size() > 0) {
- for (ClientConnection d : disconnect)
- d.close();
-
- disconnect.clear();
- }
- }
-
- private void dispatchResponse(EventWrapper event) {
- Response res = (Response) event.getEvent();
- Request.RInfo rinfo = res.getRInfo();
-
- if (rinfo instanceof Request.ClientRInfo) {
- UUID uuid = ((Request.ClientRInfo) rinfo).getRequesterUUID();
-
- ClientConnection c = clients.get(uuid);
-
- if (c != null && c.good() && c.clientReadMode.takePrivate()) {
- try {
- byte[] b = bytesFromEventWrapper(event);
- if (b != null) c.io.send(b);
-
- } catch (IOException e) {
- logger.error("error sending response to client "
- + c.uuid + ". disconnecting", e);
-
- c.close();
- }
-
- } else {
- logger.warn("no active client found for response: " + res);
- }
- }
- }
- };
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/ClientWriteMode.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/ClientWriteMode.java b/s4-core/src/main/java/io/s4/client/ClientWriteMode.java
deleted file mode 100644
index f102363..0000000
--- a/s4-core/src/main/java/io/s4/client/ClientWriteMode.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package io.s4.client;
-
-/**
- * Client's write mode.
- */
-public enum ClientWriteMode {
- Enabled, Disabled;
-
- public static ClientWriteMode fromString(String s) {
- if (s.equalsIgnoreCase("enabled"))
- return Enabled;
- else if (s.equalsIgnoreCase("disabled"))
- return Disabled;
- else
- return null;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/GenericJsonClientStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/GenericJsonClientStub.java b/s4-core/src/main/java/io/s4/client/GenericJsonClientStub.java
deleted file mode 100644
index ffce5c6..0000000
--- a/s4-core/src/main/java/io/s4/client/GenericJsonClientStub.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.client;
-
-import io.s4.client.util.ObjectBuilder;
-import io.s4.collector.EventWrapper;
-import io.s4.util.GsonUtil;
-
-import java.nio.charset.Charset;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-public class GenericJsonClientStub extends ClientStub {
-
- // private static final ObjectBuilder builder = new ObjectBuilder();
- // private static final Gson builder = new Gson();
-
- private static final Info protocolInfo = new Info("generic-json", 1, 0);
-
- @Override
- public Info getProtocolInfo() {
- return protocolInfo;
- }
-
- @Override
- public EventWrapper eventWrapperFromBytes(byte[] v) {
- try {
- // interpret v as a JSON string
- String s = new String(v, Charset.forName("UTF8"));
- JSONObject json = new JSONObject(s);
-
- String streamName = json.getString("stream");
- String className = json.getString("class");
-
- Class<?> clazz;
- try {
- clazz = Class.forName(className);
- } catch (ClassNotFoundException e) {
- throw new ObjectBuilder.Exception("bad class name for json-encoded object: "
- + className,
- e);
- }
-
- String[] keyNames = null;
- JSONArray keyArray = json.optJSONArray("keys");
- if (keyArray != null) {
- keyNames = new String[keyArray.length()];
- for (int i = 0; i < keyNames.length; ++i) {
- keyNames[i] = keyArray.optString(i);
- }
- }
-
- String jevent = json.getString("object");
-
- Object obj = GsonUtil.get().fromJson(jevent, clazz);
-
- return new EventWrapper(streamName, keyNames, obj);
-
- } catch (JSONException e) {
- logger.error("problem with event JSON", e);
- } catch (ObjectBuilder.Exception e) {
- logger.error("failed to build object from JSON", e);
- }
-
- return null;
- }
-
- @Override
- public byte[] bytesFromEventWrapper(EventWrapper ew) {
- JSONObject jevent = new JSONObject();
-
- Object obj = ew.getEvent();
-
- try {
- jevent.put("stream", ew.getStreamName());
- jevent.put("class", obj.getClass().getName());
- jevent.put("object", GsonUtil.get().toJson(obj));
-
- return jevent.toString().getBytes(Charset.forName("UTF8"));
-
- } catch (JSONException e) {
- logger.error("exception while converting event wrapper to bytes.",
- e);
- return null;
- } catch (IllegalArgumentException iae) {
- logger.error("exception while converting event wrapper to bytes.",
- iae);
- logger.error(obj);
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/Handshake.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/Handshake.java b/s4-core/src/main/java/io/s4/client/Handshake.java
deleted file mode 100644
index b93f94a..0000000
--- a/s4-core/src/main/java/io/s4/client/Handshake.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.client;
-
-import io.s4.client.ClientStub.Info;
-import io.s4.util.ByteArrayIOChannel;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.log4j.Logger;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import com.google.gson.Gson;
-
-public class Handshake {
- private final ClientStub clientStub;
-
- protected static final Logger logger = Logger.getLogger("adapter");
-
- protected final Gson gson = new Gson();
-
- public Handshake(ClientStub clientStub) {
- this.clientStub = clientStub;
- }
-
- // execute handshake with client.
- // 1. discovery: issue uuid;
- // 2. main connect: create connection
- public ClientConnection execute(Socket s) {
- byte[] v;
-
- try {
- ClientConnection conn = null;
-
- ByteArrayIOChannel io = new ByteArrayIOChannel(s);
-
- v = io.recv();
-
- if (v == null || v.length == 0) {
- // no information => client initialization
- clientInit(io);
-
- } else {
- // some data => client connect
- conn = clientConnect(v, io, s);
- }
-
- if (conn == null)
- s.close();
-
- return conn;
-
- } catch (IOException e) {
- logger.error("exception during handshake", e);
- try {
- s.close();
- return null;
-
- } catch (IOException ee) {
- throw new RuntimeException("failed to close socket after failed handshake",
- ee);
- }
- }
- }
-
- private ClientConnection clientConnect(byte[] v, ByteArrayIOChannel io,
- Socket sock) throws IOException {
-
- List<String> reason = new ArrayList<String>(1);
- ClientConnection conn = clientConnectCreate(v, io, sock, reason);
-
- String message = null;
- try {
- JSONObject resp = new JSONObject();
-
- resp.put("status", (conn != null ? "ok" : "failed"));
-
- if (conn == null && !reason.isEmpty()) {
- resp.put("reason", reason.get(0));
- }
-
- message = resp.toString();
-
- } catch (JSONException e) {
- logger.error("error creating response during connect.", e);
- return null;
- }
-
- io.send(message.getBytes());
-
- return conn;
- }
-
- private ClientConnection clientConnectCreate(byte[] v,
- ByteArrayIOChannel io,
- Socket sock,
- List<String> reason)
- throws IOException {
-
- try {
- JSONObject cInfo = new JSONObject(new String(v));
-
- String s = cInfo.optString("uuid", "");
- if (s.isEmpty()) {
- logger.error("missing client identifier during handshake.");
- reason.add("missing UUID");
- return null;
- }
-
- UUID u = UUID.fromString(s);
-
- logger.info("connecting to client " + u);
-
- s = cInfo.optString("readMode", "Private");
- ClientReadMode rmode = ClientReadMode.fromString(s);
- if (rmode == null) {
- logger.error(u + ": unknown readMode " + s);
- reason.add("unknown readMode " + s);
- return null;
-
- }
-
- s = cInfo.optString("writeMode", "Enabled");
- ClientWriteMode wmode = ClientWriteMode.fromString(s);
- if (wmode == null) {
- logger.error(u + ": unknown writeMode " + s);
- reason.add("unknown writeMode " + s);
- return null;
- }
-
- logger.info(u + " read=" + rmode + " write=" + wmode);
-
- if (rmode == ClientReadMode.None
- && wmode == ClientWriteMode.Disabled) {
- // client cannot disable read AND write...
- logger.error("client neither reads nor writes.");
- reason.add("read and write disabled");
-
- return null;
- }
-
- ClientConnection conn = new ClientConnection(clientStub, sock, u, rmode, wmode);
-
- if (rmode == ClientReadMode.Select) {
- JSONArray incl = cInfo.optJSONArray("readInclude");
- JSONArray excl = cInfo.optJSONArray("readExclude");
-
- if (incl == null && excl == null) {
- logger.error(u + ": missing stream selection information");
- reason.add("missing readInclude and readExclude");
- return null;
- }
-
- if (incl != null) {
- List<String> streams = new ArrayList<String>(incl.length());
- for (int i = 0; i < incl.length(); ++i)
- streams.add(incl.getString(i));
-
- conn.includeStreams(streams);
- }
-
- if (excl != null) {
- List<String> streams = new ArrayList<String>(excl.length());
- for (int i = 0; i < excl.length(); ++i)
- streams.add(excl.getString(i));
-
- conn.excludeStreams(streams);
- }
-
- }
-
- return conn;
-
- } catch (JSONException e) {
- logger.error("malformed JSON from client during handshake", e);
- reason.add("malformed JSON");
-
- } catch (NumberFormatException e) {
- logger.error("received malformed UUID", e);
- reason.add("malformed UUID");
-
- } catch (IllegalArgumentException e) {
- logger.error("received malformed UUID", e);
- reason.add("malformed UUID");
- }
-
- return null;
- }
-
- private void clientInit(ByteArrayIOChannel io) throws IOException {
- String uuid = UUID.randomUUID().toString();
- Info proto = clientStub.getProtocolInfo();
-
- HashMap<String, Object> info = new HashMap<String, Object>();
- info.put("uuid", uuid);
- info.put("protocol", proto);
-
- String response = gson.toJson(info) + "\n";
-
- io.send(response.getBytes());
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/IOChannel.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/IOChannel.java b/s4-core/src/main/java/io/s4/client/IOChannel.java
deleted file mode 100644
index e804b05..0000000
--- a/s4-core/src/main/java/io/s4/client/IOChannel.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.client;
-
-import java.io.IOException;
-
-public interface IOChannel {
- byte[] recv() throws IOException;
-
- void send(byte[] v) throws IOException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/InputStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/InputStub.java b/s4-core/src/main/java/io/s4/client/InputStub.java
deleted file mode 100644
index a63ccb8..0000000
--- a/s4-core/src/main/java/io/s4/client/InputStub.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.client;
-
-import io.s4.listener.EventProducer;
-
-public interface InputStub extends EventProducer {
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/a7b4afb0/s4-core/src/main/java/io/s4/client/OutputStub.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/io/s4/client/OutputStub.java b/s4-core/src/main/java/io/s4/client/OutputStub.java
deleted file mode 100644
index b68573d..0000000
--- a/s4-core/src/main/java/io/s4/client/OutputStub.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package io.s4.client;
-
-import io.s4.processor.AsynchronousEventProcessor;
-
-import java.util.List;
-
-public interface OutputStub extends AsynchronousEventProcessor {
-
- List<String> getAcceptedStreams();
-
-}
\ No newline at end of file