You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by ki...@apache.org on 2013/02/13 16:26:35 UTC

git commit: [S4-110] Fixing the emitters and added remotesender impl. Currently supports only one cluster

Updated Branches:
  refs/heads/S4-110-new 9690dd120 -> 16e5353a0


[S4-110] Fixing the emitters and added remotesender impl. Currently supports only one cluster


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/16e5353a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/16e5353a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/16e5353a

Branch: refs/heads/S4-110-new
Commit: 16e5353a079294981d4cb5ab26381bb3a17cd57e
Parents: 9690dd1
Author: kishoreg <ki...@apache.org>
Authored: Wed Feb 13 08:26:25 2013 -0800
Committer: kishoreg <ki...@apache.org>
Committed: Wed Feb 13 08:26:25 2013 -0800

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |    2 +-
 .../apache/s4/comm/topology/ClusterFromHelix.java  |    2 +-
 .../s4/comm/util/FileSystemArchiveFetcher.java     |    6 +
 .../main/java/org/apache/s4/core/BaseModule.java   |    2 +-
 .../org/apache/s4/core/HelixBasedCoreModule.java   |    4 +-
 .../org/apache/s4/core/HelixRemoteSenders.java     |  121 +++++++++++++++
 .../main/java/org/apache/s4/core/RemoteSender.java |    7 +
 .../java/org/apache/s4/core/util/AppConfig.java    |   34 ++++-
 .../java/org/apache/s4/deploy/AppStateModel.java   |   19 ++-
 .../s4/deploy/HelixBasedDeploymentManager.java     |    2 +-
 .../java/org/apache/s4/tools/helix/DeployApp.java  |    1 -
 .../apache/s4/tools/helix/GenericEventAdapter.java |    3 -
 .../s4/example/twitter/TwitterInputAdapter.java    |    2 +
 13 files changed, 187 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/16e5353a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 89a2e42..1bdea43 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -268,7 +268,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
 
     @Override
     public int getPartitionCount(String streamName) {
-        return topology.getPhysicalCluster().getPartitionCount(streamName);
+        return topology.getPartitionCount(streamName);
     }
 
     class ExceptionHandler extends SimpleChannelUpstreamHandler {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/16e5353a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
index 2ba5b9a..902d202 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromHelix.java
@@ -195,7 +195,7 @@ public class ClusterFromHelix extends RoutingTableProvider implements Cluster {
             for (ClusterChangeListener listener : listeners) {
                 listener.onChange();
             }
-            logger.info("End:Processing change in cluster topology");
+            logger.info("End:Processing change in cluster topology:"+partitionCountMapRef);
 
         } catch (Exception e) {
             logger.error("", e);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/16e5353a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java
index 294877f..61c7fc5 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/FileSystemArchiveFetcher.java
@@ -23,6 +23,7 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.InputStream;
 import java.net.URI;
+import java.net.URISyntaxException;
 
 /**
  * Fetches modules jar files and application S4R files from a file system, possibly distributed.
@@ -38,4 +39,9 @@ public class FileSystemArchiveFetcher implements ArchiveFetcher {
             throw new ArchiveFetchException("Cannot retrieve file from uri [" + uri.toString() + "]");
         }
     }
+    public static void main(String[] args) throws ArchiveFetchException, URISyntaxException {
+        FileSystemArchiveFetcher fetcher = new FileSystemArchiveFetcher();
+        fetcher.fetch(new URI("file:///home/kgopalak/projects/incubator-s4/test-apps/twitter-adapter/build/libs/twitter-adapter.s4r"));
+        
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/16e5353a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
index 6062305..5715060 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -56,7 +56,7 @@ public class BaseModule extends AbstractModule {
         String clusterManager = System.getenv("S4_CLUSTER_MANAGER");
         if (config.getBoolean("s4.helix") || "HELIX".equalsIgnoreCase(clusterManager)) {
             bind(Assignment.class).to(AssignmentFromHelix.class).asEagerSingleton();
-            bind(Cluster.class).to(ClusterFromHelix.class);
+            bind(Cluster.class).to(ClusterFromHelix.class).in(Scopes.SINGLETON);
             bind(TaskStateModelFactory.class);
             bind(AppStateModelFactory.class).in(Scopes.SINGLETON);
             bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/16e5353a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
index 01f086c..94a1bb3 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixBasedCoreModule.java
@@ -48,10 +48,10 @@ public class HelixBasedCoreModule extends AbstractModule {
     @Override
     protected void configure() {
 
-        bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
+        //bind(DeploymentManager.class).to(HelixBasedDeploymentManager.class).in(Scopes.SINGLETON);
 
         bind(RemoteStreams.class).to(HelixRemoteStreams.class).in(Scopes.SINGLETON);
-        bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
+        bind(RemoteSenders.class).to(HelixRemoteSenders.class).in(Scopes.SINGLETON);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/16e5353a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixRemoteSenders.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixRemoteSenders.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixRemoteSenders.java
new file mode 100644
index 0000000..3474cc1
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/HelixRemoteSenders.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.s4.core;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.SerializerDeserializerFactory;
+import org.apache.s4.comm.tcp.RemoteEmitters;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Clusters;
+import org.apache.s4.comm.topology.RemoteStreams;
+import org.apache.s4.comm.topology.StreamConsumer;
+import org.apache.s4.core.staging.RemoteSendersExecutorServiceFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class HelixRemoteSenders implements RemoteSenders {
+
+    Logger logger = LoggerFactory.getLogger(HelixRemoteSenders.class);
+
+    final RemoteEmitters remoteEmitters;
+
+    final SerializerDeserializer serDeser;
+
+    final Hasher hasher;
+
+    ConcurrentMap<String, RemoteSender> sendersByTopology = new ConcurrentHashMap<String, RemoteSender>();
+
+    private final ExecutorService executorService;
+
+    private String clusterName;
+
+    private RemoteSender sender;
+
+    @Inject
+    public HelixRemoteSenders(@Named("s4.cluster.name") String clusterName,Cluster topology,
+            RemoteEmitters remoteEmitters,
+            SerializerDeserializerFactory serDeserFactory, Hasher hasher,
+            RemoteSendersExecutorServiceFactory senderExecutorFactory) {
+        this.remoteEmitters = remoteEmitters;
+        this.clusterName = clusterName;
+        this.hasher = hasher;
+        executorService = senderExecutorFactory.create();
+
+        serDeser = serDeserFactory.createSerializerDeserializer(Thread
+                .currentThread().getContextClassLoader());
+        sender = new RemoteSender(topology,
+                remoteEmitters.getEmitter(topology), hasher,
+                clusterName);
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.core.RemoteSenders#send(java.lang.String,
+     * org.apache.s4.base.Event)
+     */
+    @Override
+    public void send(String hashKey, Event event) {
+        event.setAppId(-1);
+        // NOTE: this implies multiple serializations, there might be an
+        // optimization
+        executorService.execute(new SendToRemoteClusterTask(hashKey, event,
+                sender));
+    }
+
+    class SendToRemoteClusterTask implements Runnable {
+
+        String hashKey;
+        Event event;
+        RemoteSender sender;
+
+        public SendToRemoteClusterTask(String hashKey, Event event,
+                RemoteSender sender) {
+            super();
+            this.hashKey = hashKey;
+            this.event = event;
+            this.sender = sender;
+        }
+
+        @Override
+        public void run() {
+            try {
+                sender.send(event.getStreamName(), hashKey,
+                        serDeser.serialize(event));
+            } catch (InterruptedException e) {
+                logger.error(
+                        "Interrupted blocking send operation for event {}. Event is lost.",
+                        event);
+                Thread.currentThread().interrupt();
+            }
+
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/16e5353a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
index ad9e02d..43d01c8 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java
@@ -25,12 +25,15 @@ import org.apache.s4.base.Destination;
 import org.apache.s4.base.Emitter;
 import org.apache.s4.base.Hasher;
 import org.apache.s4.comm.topology.Cluster;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Sends events to a remote cluster.
  * 
  */
 public class RemoteSender {
+    private static final Logger logger = LoggerFactory.getLogger(RemoteSender.class);
 
     final private Emitter emitter;
     final private Hasher hasher;
@@ -48,6 +51,7 @@ public class RemoteSender {
     }
 
     public void send(String streamName,String hashKey, ByteBuffer message) throws InterruptedException {
+        
         int partition;
         if (hashKey == null) {
             // round robin by default
@@ -56,7 +60,10 @@ public class RemoteSender {
             partition = (int) (hasher.hash(hashKey) % emitter.getPartitionCount(streamName));
         }
         //TODO: where do we get the mode
+        
         Destination destination = cluster.getDestination(streamName, partition, emitter.getType());
+        logger.info("Sending event to partition:"+ partition + " stream: "+streamName);
+        
         emitter.send(destination, message);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/16e5353a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
index 5b9b0e7..8ef87c6 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/util/AppConfig.java
@@ -47,6 +47,29 @@ public class AppConfig {
         this.customModulesURIs = customModulesURIs;
         this.namedParameters = namedParameters;
     }
+    public AppConfig(String appName, String appClassName, String appURI, List<String> customModulesNames,
+            List<String> customModulesURIs, String namedParametersAsString) {
+        super();
+        this.appName = appName;
+        this.appClassName = appClassName;
+        this.appURI = appURI;
+        this.customModulesNames = customModulesNames;
+        this.customModulesURIs = customModulesURIs;
+        this.namedParameters = convertStringToMap(namedParametersAsString);
+    }
+
+    final private static Map<String, String> convertStringToMap(
+            String namedParametersAsString) {
+        Map<String, String> map = Maps.newHashMap();
+        String[] kvs = namedParametersAsString.split(",");
+        for(String kv:kvs){
+            String[] split = kv.split("[=]");
+            if(split.length==2){
+                map.put(split[0], split[1]);
+            }
+        }
+        return map;
+    }
 
     public String getAppName() {
         return appName;
@@ -81,8 +104,10 @@ public class AppConfig {
             return "";
         }
         StringBuilder sb = new StringBuilder();
+        String delim = "";
         for (Map.Entry<String, String> param : namedParameters.entrySet()) {
-            sb.append(param.getKey() + "=" + param.getValue() + ",");
+            sb.append(param.getKey() + "=" + param.getValue() ).append(delim);
+            delim=",";
         }
         return sb.toString();
     }
@@ -180,6 +205,13 @@ public class AppConfig {
             }
             return this;
         }
+        public Builder namedParameters(String namedParametersAsString) {
+            if(namedParametersAsString!=null){
+                Map<String, String> namedParameters = convertStringToMap(namedParametersAsString);
+                config.namedParameters = namedParameters;
+            }
+            return this;
+        }
 
         public AppConfig build() {
             return config;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/16e5353a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
index b70c27f..5a0d03b 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/AppStateModel.java
@@ -39,7 +39,8 @@ public class AppStateModel extends StateModel {
     }
 
     @Transition(from = "OFFLINE", to = "ONLINE")
-    public void deploy(Message message, NotificationContext context) throws Exception {
+    public void deploy(Message message, NotificationContext context)
+            throws Exception {
         logger.info("Deploying app:" + appName);
         HelixManager manager = context.getManager();
         ConfigAccessor configAccessor = manager.getConfigAccessor();
@@ -58,7 +59,8 @@ public class AppStateModel extends StateModel {
                 .customModulesNames(
                         getListFromCommaSeparatedValues(configAccessor.get(scope, AppConfig.MODULES_CLASSES)))
                 .customModulesURIs(getListFromCommaSeparatedValues(configAccessor.get(scope, AppConfig.MODULES_URIS)))
-                .appURI(configAccessor.get(scope, AppConfig.APP_URI)).build();
+                .appURI(configAccessor.get(scope, AppConfig.APP_URI))
+                .namedParameters(configAccessor.get(scope, AppConfig.NAMED_PARAMETERS)).build();
 
         return appConfig;
     }
@@ -72,7 +74,8 @@ public class AppStateModel extends StateModel {
     }
 
     @Transition(from = "ONLINE", to = "OFFLINE")
-    public void undeploy(Message message, NotificationContext context) throws Exception {
+    public void undeploy(Message message, NotificationContext context)
+            throws Exception {
         logger.info("Undeploying app:" + appName);
         HelixManager manager = context.getManager();
         ConfigAccessor configAccessor = manager.getConfigAccessor();
@@ -82,17 +85,19 @@ public class AppStateModel extends StateModel {
 
     }
 
-    private void loadModulesAndStartNode(final Injector parentInjector, final AppConfig appConfig)
-            throws ArchiveFetchException {
+    private void loadModulesAndStartNode(final Injector parentInjector,
+            final AppConfig appConfig) throws ArchiveFetchException {
 
         String appName = appConfig.getAppName();
 
         List<File> modulesLocalCopies = new ArrayList<File>();
 
         for (String uriString : appConfig.getCustomModulesURIs()) {
-            modulesLocalCopies.add(S4Bootstrap.fetchModuleAndCopyToLocalFile(appName, uriString, fetcher));
+            modulesLocalCopies.add(S4Bootstrap.fetchModuleAndCopyToLocalFile(
+                    appName, uriString, fetcher));
         }
-        final ModulesLoader modulesLoader = new ModulesLoaderFactory().createModulesLoader(modulesLocalCopies);
+        final ModulesLoader modulesLoader = new ModulesLoaderFactory()
+                .createModulesLoader(modulesLocalCopies);
 
         Thread t = new Thread(new Runnable() {
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/16e5353a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
index 62a16a3..c5ba6ed 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/deploy/HelixBasedDeploymentManager.java
@@ -30,7 +30,7 @@ public class HelixBasedDeploymentManager implements DeploymentManager {
 
     @Override
     public void deploy(AppConfig appConfig) throws DeploymentFailedException {
-        //DeploymentUtils.deploy(server, fetcher, clusterName, appConfig);
+        DeploymentUtils.deploy(server, fetcher, clusterName, appConfig);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/16e5353a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
index cc3d128..693e13d 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/DeployApp.java
@@ -41,7 +41,6 @@ public class DeployApp extends S4ArgsBase {
 
         HelixAdmin admin = new ZKHelixAdmin(deployArgs.zkConnectionString);
         ConfigScopeBuilder builder = new ConfigScopeBuilder();
-        // ConfigScope scope = builder.forCluster(deployArgs.clusterName).forResource(deployArgs.appName).build();
         ConfigScope scope = builder.forCluster(deployArgs.clusterName).forResource(deployArgs.appName).build();
         Map<String, String> properties = new HashMap<String, String>();
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/16e5353a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
index b16d4e0..bb86312 100644
--- a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/helix/GenericEventAdapter.java
@@ -1,8 +1,5 @@
 package org.apache.s4.tools.helix;
 
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/16e5353a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
----------------------------------------------------------------------
diff --git a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
index 9afb04e..8f06433 100644
--- a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
+++ b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
@@ -89,6 +89,7 @@ public class TwitterInputAdapter extends AdapterApp {
 
             @Override
             public void onStatus(Status status) {
+                logger.info("Adding status "+ status.getText());
                 messageQueue.add(status);
 
             }
@@ -125,6 +126,7 @@ public class TwitterInputAdapter extends AdapterApp {
             while (true) {
                 try {
                     Status status = messageQueue.take();
+                    logger.info("Sending status "+ status.getText());
                     Event event = new Event();
                     event.put("statusText", String.class, status.getText());
                     getRemoteStream().put(event);