You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/06/15 16:06:01 UTC

[2/22] Merge branch 'S4-22' into piper

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 6446a7c,935d4a5..fe4ceed
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@@ -15,9 -15,8 +15,9 @@@
   */
  package org.apache.s4.core;
  
- import java.util.Arrays;
- import java.util.Collection;
+ import java.util.ArrayList;
+ import java.util.List;
 +import java.util.Map;
  import java.util.concurrent.TimeUnit;
  
  import org.apache.s4.base.Event;
@@@ -26,32 -29,28 +30,34 @@@ import org.apache.s4.core.App.ClockType
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
- import com.google.common.collect.LinkedListMultimap;
 +import com.google.common.collect.Maps;
- import com.google.common.collect.Multimap;
- import com.google.inject.AbstractModule;
- import com.google.inject.Guice;
  import com.google.inject.Inject;
- import com.google.inject.Injector;
+ import com.google.inject.name.Named;
  
  /*
-  * Container base class to hold all processing elements. We will implement administrative methods here. 
+  * Container base class to hold all processing elements. We will implement administrative methods here.
   */
  public abstract class App {
  
      static final Logger logger = LoggerFactory.getLogger(App.class);
  
-     /* PE prototype to streams relations. */
-     final private Multimap<ProcessingElement, Streamable<? extends Event>> pe2stream = LinkedListMultimap.create();
+     /* All the PE prototypes in this app. */
+     final private List<ProcessingElement> pePrototypes = new ArrayList<ProcessingElement>();
  
-     /* Stream to PE prototype relations. */
-     final private Multimap<Streamable<? extends Event>, ProcessingElement> stream2pe = LinkedListMultimap.create();
++    // /* Stream to PE prototype relations. */
++    // final private Multimap<Streamable<? extends Event>, ProcessingElement> stream2pe = LinkedListMultimap.create();
+     /* All the internal streams in this app. */
+     final private List<Streamable<Event>> streams = new ArrayList<Streamable<Event>>();
+ 
+     /* All the the event sources exported by this app. */
+     final private List<EventSource> eventSources = new ArrayList<EventSource>();
  
 +    /* Pes indexed by name. */
 +    Map<String, ProcessingElement> peByName = Maps.newHashMap();
 +
      private ClockType clockType = ClockType.WALL_CLOCK;
      private int id = -1;
+ 
      @Inject
      private Sender sender;
      @Inject
@@@ -90,33 -103,44 +110,59 @@@
          this.id = id;
      }
  
-     /**
-      * @return all the pePrototypes
-      */
-     Collection<ProcessingElement> getPePrototypes() {
-         return pe2stream.keySet();
+     /* Should only be used within the core package. */
+     void addPEPrototype(ProcessingElement pePrototype) {
+         pePrototypes.add(pePrototype);
      }
  
-     /**
-      * @return all the pePrototypes
-      */
-     <T extends Event> Collection<ProcessingElement> getTargetPEs(Stream<T> stream) {
++    public ProcessingElement getPE(String name) {
++
++        return peByName.get(name);
++    }
++
+     /* Should only be used within the core package. */
+     public void addStream(Streamable stream) {
+         streams.add(stream);
+     }
+ 
+     /* Should only be used within the core package. */
+     void addEventSource(EventSource es) {
+         eventSources.add(es);
+     }
+ 
+     /* Returns list of PE prototypes. Should only be used within the core package. */
+     List<ProcessingElement> getPePrototypes() {
+         return pePrototypes;
+     }
  
-         Map<Streamable<?>, Collection<ProcessingElement>> stream2peMap = stream2pe.asMap();
++    // void addStream(Streamable<? extends Event> stream, ProcessingElement pePrototype) {
++    // logger.info("Add Stream [{}] with PE prototype [{}].", toString(stream), toString(pePrototype));
++    // stream2pe.put(stream, pePrototype);
++    //
++    // }
++
+     /* Returns list of internal streams. Should only be used within the core package. */
+     // TODO visibility
+     public List<Streamable<Event>> getStreams() {
+         return streams;
+     }
  
-         return stream2peMap.get(stream);
+     /* Returns list of the event sources to be exported. Should only be used within the core package. */
+     // TODO visibility
+     public List<EventSource> getEventSources() {
+         return eventSources;
      }
  
      protected abstract void onStart();
  
 +    /**
 +     * This method is called by the container after initialization. Once this method is called, threads get started and
 +     * events start flowing.
 +     */
-     protected void start() {
- 
-         logger.info("Prepare to start App [{}].", getClass().getName());
+     public final void start() {
  
+         // logger.info("Prepare to start App [{}].", getClass().getName());
+         //
          /* Start all streams. */
          for (Streamable<? extends Event> stream : getStreams()) {
              stream.start();
@@@ -131,22 -155,16 +177,22 @@@
          onStart();
      }
  
 +    /**
 +     * This method is called by the container to initialize applications.
 +     */
      protected abstract void onInit();
  
-     protected void init() {
+     public final void init() {
  
          onInit();
      }
  
 +    /**
 +     * This method is called by the container before unloading the application.
 +     */
      protected abstract void onClose();
  
-     protected void close() {
+     public final void close() {
  
          onClose();
          removeAll();
@@@ -169,31 -181,24 +215,18 @@@
  
          }
  
 -        /* Get the set of streams and close them. */
 -        for (Streamable<?> stream : getStreams()) {
 -            logger.trace("Closing stream [{}].", stream.getName());
 -            stream.close();
 -        }
 -
          /* Finally remove the entire app graph. */
          logger.trace("Clear app graph.");
-         pe2stream.clear();
-         stream2pe.clear();
-     }
- 
-     void addPEPrototype(ProcessingElement pePrototype, Stream<? extends Event> stream) {
  
-         logger.info("Add PE prototype [{}] with stream [{}].", toString(pePrototype), toString(stream));
-         pe2stream.put(pePrototype, stream);
+         pePrototypes.clear();
+         streams.clear();
      }
  
-     public ProcessingElement getPE(String name) {
- 
-         return peByName.get(name);
-     }
- 
-     void addStream(Streamable<? extends Event> stream, ProcessingElement pePrototype) {
-         logger.info("Add Stream [{}] with PE prototype [{}].", toString(stream), toString(pePrototype));
-         stream2pe.put(stream, pePrototype);
+     void addPEPrototype(ProcessingElement pePrototype, Stream<? extends Event> stream) {
  
-     }
+         // logger.info("Add PE prototype [{}] with stream [{}].", toString(pePrototype), toString(stream));
+         pePrototypes.add(pePrototype);
  
-     Collection<Streamable<? extends Event>> getStreams() {
-         return stream2pe.keySet();
      }
  
      /**
@@@ -300,32 -309,30 +337,53 @@@
       */
      protected <T extends Event> Stream<T> createStream(String name, ProcessingElement... processingElements) {
  
 -        return new Stream<T>(this, name, processingElements);
 +        return new Stream<T>(this).setName(name).setPEs(processingElements);
 +    }
 +
 +    /**
 +     * Creates stream with default values. Use the builder methods to configure the stream. Example:
 +     * <p>
 +     * 
 +     * <pre>
 +     *  s1 = <SampleEvent> createStream().withName("My first stream.").withKey(new AKeyFinder()).to(somePE);
 +     * </pre>
 +     * 
 +     * <p>
 +     * 
 +     * @param name
 +     *            the name of the stream
 +     * @param processingElements
 +     *            the target processing elements
 +     * @return the stream
 +     */
 +    public <T extends Event> Stream<T> createStream(Class<T> type) {
 +
 +        Stream<T> stream = new Stream<T>(this);
 +        stream.setEventType(type);
 +        return stream;
      }
  
+     protected <T extends Event> RemoteStream createOutputStream(String name) {
+         return createOutputStream(name, null);
+     }
+ 
+     protected <T extends Event> RemoteStream createOutputStream(String name, KeyFinder<Event> finder) {
+         return new RemoteStream(this, name, finder, remoteSenders, hasher, remoteStreams, clusterName);
+     }
+ 
+     protected <T extends Event> Stream<T> createInputStream(String streamName, KeyFinder<T> finder,
+             ProcessingElement... processingElements) {
+         remoteStreams.addInputStream(getId(), clusterName, streamName);
+         return createStream(streamName, finder, processingElements);
+ 
+     }
+ 
+     protected <T extends Event> Stream<T> createInputStream(String streamName, ProcessingElement... processingElements) {
+         remoteStreams.addInputStream(getId(), clusterName, streamName);
+         return createStream(streamName, processingElements);
+ 
+     }
+ 
      /**
       * Creates a {@link ProcessingElement} prototype.
       * 
@@@ -340,29 -345,34 +398,49 @@@
          try {
              // TODO: make sure this doesn't crash if PE has a constructor other than with App as argument.
              Class<?>[] types = new Class<?>[] { App.class };
-             T pe = type.getDeclaredConstructor(types).newInstance(this);
-             pe.setName(name);
-             return pe;
- 
+             try {
+                 T pe = type.getDeclaredConstructor(types).newInstance(this);
++                pe.setName(name);
+                 return pe;
+             } catch (NoSuchMethodException e) {
+                 // no such constructor. Use the setter
+                 T pe = type.getDeclaredConstructor(new Class[] {}).newInstance();
+                 pe.setApp(this);
++                pe.setName(name);
+                 return pe;
+             }
          } catch (Exception e) {
              logger.error(e.getMessage(), e);
              return null;
          }
      }
  
 +    /**
 +     * Creates a {@link ProcessingElement} prototype.
 +     * 
 +     * @param type
 +     *            the processing element type.
 +     * @return the processing element prototype.
 +     */
 +    public <T extends ProcessingElement> T createPE(Class<T> type) {
 +
 +        return createPE(type, null);
 +
 +    }
 +
+     public <T extends WindowingPE<?>> T createWindowingPE(Class<T> type, long slotDuration, TimeUnit timeUnit,
+             int numSlots) {
+         try {
+             Class<?>[] types = new Class<?>[] { App.class, long.class, TimeUnit.class, int.class };
+             T pe = type.getDeclaredConstructor(types).newInstance(
+                     new Object[] { this, slotDuration, timeUnit, numSlots });
+             return pe;
+         } catch (Exception e) {
+             logger.error("Cannot instantiate pe for class [{}]", type.getName(), e);
+             return null;
+         }
+     }
+ 
      static private String toString(ProcessingElement pe) {
          return pe != null ? pe.getClass().getName() + " " : "null ";
      }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 152f24e,1576044..df1b720
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@@ -39,43 -39,40 +39,39 @@@ import com.google.common.collect.MapMak
  import com.google.common.collect.Maps;
  
  /**
-  * @author Leo Neumeyer
-  * @author Matthieu Morel
-  *         <p>
-  *         Base class for implementing processing in S4. All instances are organized as follows:
-  *         <ul>
-  *         <li>A PE prototype is a special type of instance that, along with {@link Stream} defines the topology of the
-  *         application graph.
-  *         <li>PE prototypes manage the creation and destruction of PE instances.
-  *         <li>All PE instances are clones of a PE prototype.
-  *         <li>PE instances are associated with a unique key.
-  *         <li>PE instances do the actual work by processing any number of input events of various types and emit output
-  *         events of various types.
-  *         <li>To process events, {@code ProcessingElement} dynamically matches an event type to a processing method.
-  *         See {@link org.apache.s4.core.gen.OverloadDispatcher} . There are two types of processing methods:
-  *         <ul>
-  *         <li>{@code onEvent(SomeEvent event)} When implemented, input events of type {@code SomeEvent} will be
-  *         dispatched to this method.
-  *         <li>{@code onTrigger(AnotherEvent event)} When implemented, input events of type {@code AnotherEvent} will be
-  *         dispatched to this method when certain conditions are met. See
-  *         {@link #setTrigger(Class, int, long, TimeUnit)}.
-  *         </ul>
-  *         <li>
-  *         A PE implementation must not create threads. A periodic task can be implemented by overloading the
-  *         {@link #onTime()} method. See {@link #setTimerInterval(long, TimeUnit)}
-  *         <li>If a reference in the PE prototype shared by the PE instances, the object must be thread safe.
-  *         <li>The code in a PE instance is synchronized by the framework to avoid concurrency problems.
-  *         <li>In some special cases, it may be desirable to allow concurrency in the PE instance. For example, there
-  *         may be several event processing methods that can safely run concurrently. To enable concurrency, annotate the
-  *         implementation of {@code ProcessingElement} with {@link ThreadSafe}.
-  *         <li>PE instances never use the constructor. They must be initialized by implementing the {@link #onCreate()}
-  *         method.
-  *         <li>PE class fields are cloned from the prototype. References are also copied which means that if the
-  *         prototype creates a collection object, all instances will be sharing the same collection object which is
-  *         usually <em>NOT</em> what the programmer intended . The application developer is responsible for initializing
-  *         objects in the {@link #onCreate()} method. For example, if each instance requires a
-  *         <tt>List<tt/> object the PE should implement the following:
+  * <p>
+  * Base class for implementing processing in S4. All instances are organized as follows:
+  * <ul>
+  * <li>A PE prototype is a special type of instance that, along with {@link Stream} defines the topology of the
+  * application graph.
+  * <li>PE prototypes manage the creation and destruction of PE instances.
+  * <li>All PE instances are clones of a PE prototype.
+  * <li>PE instances are associated with a unique key.
+  * <li>PE instances do the actual work by processing any number of input events of various types and emit output events
+  * of various types.
+  * <li>To process events, {@code ProcessingElement} dynamically matches an event type to a processing method. See
+  * {@link org.apache.s4.core.gen.OverloadDispatcher} . There are two types of processing methods:
+  * <ul>
+  * <li>{@code onEvent(SomeEvent event)} When implemented, input events of type {@code SomeEvent} will be dispatched to
+  * this method.
+  * <li>{@code onTrigger(AnotherEvent event)} When implemented, input events of type {@code AnotherEvent} will be
+  * dispatched to this method when certain conditions are met. See {@link #setTrigger(Class, int, long, TimeUnit)}.
+  * </ul>
+  * <li>
+  * A PE implementation must not create threads. A periodic task can be implemented by overloading the {@link #onTime()}
+  * method. See {@link #setTimerInterval(long, TimeUnit)}
+  * <li>If a reference in the PE prototype shared by the PE instances, the object must be thread safe.
+  * <li>The code in a PE instance is synchronized by the framework to avoid concurrency problems.
+  * <li>In some special cases, it may be desirable to allow concurrency in the PE instance. For example, there may be
+  * several event processing methods that can safely run concurrently. To enable concurrency, annotate the implementation
+  * of {@code ProcessingElement} with {@link ThreadSafe}.
+  * <li>PE instances never use the constructor. They must be initialized by implementing the {@link #onCreate()} method.
+  * <li>PE class fields are cloned from the prototype. References are also copied which means that if the prototype
+  * creates a collection object, all instances will be sharing the same collection object which is usually <em>NOT</em>
+  * what the programmer intended . The application developer is responsible for initializing objects in the
+  * {@link #onCreate()} method. For example, if each instance requires a
+  * <tt>List<tt/> object the PE should implement the following:
   *         <pre>
 - *         {@code
   *         public class MyPE extends ProcessingElement {
   * 
   *           private Map<String, Integer> wordCount;
@@@ -377,9 -356,6 +382,9 @@@ public abstract class ProcessingElemen
              return this;
  
          timer = new Timer();
 +        logger.info("Created timer for PE prototype [{}] with interval [{}].", this.getClass().getName(),
 +                timerIntervalInMilliseconds);
- 
++        timer.schedule(new OnTimeTask(), 0, timerIntervalInMilliseconds);
          return this;
      }
  

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index 1790e44,8e04fc4..96b7b62
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@@ -21,15 -5,11 +5,14 @@@ import java.util.concurrent.ArrayBlocki
  import java.util.concurrent.BlockingQueue;
  
  import org.apache.s4.base.Event;
+ import org.apache.s4.base.EventMessage;
 +import org.apache.s4.base.GenericKeyFinder;
- import org.apache.s4.base.Key;
  import org.apache.s4.base.KeyFinder;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 +import com.google.common.base.Preconditions;
- import com.google.common.collect.Sets;
 +
  /**
   * {@link Stream} and {@link ProcessingElement} objects represent the links and nodes in the application graph. A stream
   * sends an {@link Event} object to {@link ProcessingElement} instances located anywhere in a cluster.
@@@ -45,16 -25,15 +28,16 @@@ public class Stream<T extends Event> im
      final static private String DEFAULT_SEPARATOR = "^";
      final static private int CAPACITY = 1000;
      private static int idCounter = 0;
-     private String name = "";
-     private Key<T> key = null;
-     private ProcessingElement[] targetPEs = null;
-     final private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(CAPACITY);
 -    final private String name;
 -    final protected Key<T> key;
 -    final private ProcessingElement[] targetPEs;
++    private String name;
++    protected Key<T> key;
++    private ProcessingElement[] targetPEs;
+     protected final BlockingQueue<EventMessage> queue = new ArrayBlockingQueue<EventMessage>(CAPACITY);
      private Thread thread;
      final private Sender sender;
      final private Receiver receiver;
-     final private int id;
+     // final private int id;
      final private App app;
 +    private Class<T> eventType = null;
  
      /**
       * Send events using a {@link KeyFinder<T>}. The key finder extracts the value of the key which is used to determine
@@@ -62,38 -41,33 +45,27 @@@
       * 
       * @param app
       *            we always register streams with the parent application.
--     * @param name
--     *            give this stream a meaningful name in the context of your application.
--     * @param finder
--     *            the finder object to find the value of the key in an event.
--     * @param processingElements
--     *            the target PE prototypes for this stream.
       */
 -    public Stream(App app, String name, KeyFinder<T> finder, ProcessingElement... processingElements) {
 +    public Stream(App app) {
-         synchronized (Stream.class) {
-             id = idCounter++;
-         }
+         // synchronized (Stream.class) {
+         // id = idCounter++;
+         // }
          this.app = app;
-         app.addStream(this, null);
- 
+         app.addStream(this);
 -        this.name = name;
 -
 -        if (finder == null) {
 -            this.key = null;
 -        } else {
 -            this.key = new Key<T>(finder, DEFAULT_SEPARATOR);
 -        }
          this.sender = app.getSender();
          this.receiver = app.getReceiver();
 -        this.targetPEs = processingElements;
      }
  
-     void start() {
- 
-         /* Get target PE prototypes for this stream. Remove null key. */
-         Set<? extends ProcessingElement> pes = Sets.newHashSet(app.getTargetPEs(this));
-         pes.remove(null);
-         targetPEs = new ProcessingElement[pes.size()];
-         pes.toArray(targetPEs);
+     public void start() {
  
 +        if (logger.isTraceEnabled()) {
-             for (ProcessingElement pe : pes) {
-                 logger.trace("Starting stream [{}] with target PE [{}].", this.getName(), pe.getName());
++            if (targetPEs != null) {
++                for (ProcessingElement pe : targetPEs) {
++                    logger.trace("Starting stream [{}] with target PE [{}].", this.getName(), pe.getName());
++                }
 +            }
 +        }
 +
          /* Start streaming. */
          thread = new Thread(this, name);
          thread.start();
@@@ -101,78 -75,18 +73,74 @@@
      }
  
      /**
-      * Stop and close this stream.
-      */
-     void close() {
-         thread.interrupt();
-     }
- 
-     /**
 -     * Send events to all available {@link ProcessingElement} instances contained by the {@link ProcessingElement}
 -     * prototypes passed to this constructor.
 +     * Name the stream.
       * 
 -     * @param app
 -     *            we always register streams with the parent application.
       * @param name
 -     *            give this stream a meaningful name in the context of your application.
 -     * @param processingElements
 -     *            the target PE prototypes for this stream.
 +     *            the stream name, default is an empty string.
 +     * @return the stream object
 +     */
 +    public Stream<T> setName(String name) {
 +        this.name = name;
 +        return this;
 +    }
 +
 +    /**
 +     * Define the key finder for this stream.
 +     * 
 +     * @param keyFinder
 +     *            a function to lookup the value of the key.
 +     * @return the stream object
 +     */
 +    public Stream<T> setKey(KeyFinder<T> keyFinder) {
 +        this.key = new Key<T>(keyFinder, DEFAULT_SEPARATOR);
 +        return this;
 +    }
 +
++    void setEventType(Class<T> type) {
++        this.eventType = type;
++    }
++
 +    /**
 +     * Define the key finder for this stream using a descriptor.
 +     * 
 +     * @param keyFinderString
 +     *            a descriptor to lookup up the value of the key.
 +     * @return the stream object
 +     */
 +    public Stream<T> setKey(String keyName) {
 +
 +        Preconditions.checkNotNull(eventType);
 +
 +        KeyFinder<T> kf = new GenericKeyFinder<T>(keyName, eventType);
 +        setKey(kf);
 +
 +        return this;
 +    }
 +
 +    /**
 +     * Send events from this stream to a PE.
 +     * 
 +     * @param pe
 +     *            a target PE.
 +     * 
 +     * @return the stream object
 +     */
 +    public Stream<T> setPE(ProcessingElement pe) {
-         app.addStream(this, pe);
++        app.addStream(this);
 +        return this;
 +    }
 +
 +    /**
 +     * Send events from this stream to various PEs.
 +     * 
 +     * @param pe
 +     *            a target PE array.
 +     * 
 +     * @return the stream object
       */
 -    public Stream(App app, String name, ProcessingElement... processingElements) {
 -        this(app, name, null, processingElements);
 +    public Stream<T> setPEs(ProcessingElement[] pes) {
-         for (int i = 0; i < pes.length; i++)
-             app.addStream(this, pes[i]);
++        this.targetPEs = pes;
 +        return this;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 1e26512,4eae82b..8601d91
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@@ -141,11 -121,11 +121,11 @@@ public class TestAutomaticDeployment 
          if (!initial) {
              ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
              record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
-             zkClient.create("/" + clusterName + "/apps/testApp", record, CreateMode.PERSISTENT);
+             zkClient.create("/s4/clusters/" + CLUSTER_NAME + "/apps/testApp", record, CreateMode.PERSISTENT);
          }
  
--        Assert.assertTrue(signalAppInitialized.await(10, TimeUnit.SECONDS));
--        Assert.assertTrue(signalAppStarted.await(10, TimeUnit.SECONDS));
++        Assert.assertTrue(signalAppInitialized.await(20, TimeUnit.SECONDS));
++        Assert.assertTrue(signalAppStarted.await(20, TimeUnit.SECONDS));
  
          String time1 = String.valueOf(System.currentTimeMillis());
  

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
index 011f203,0000000..ff3fa02
mode 100644,000000..100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestModule.java
@@@ -1,68 -1,0 +1,67 @@@
 +package org.apache.s4.deploy;
 +
 +import java.io.InputStream;
 +
 +import org.apache.commons.configuration.ConfigurationConverter;
 +import org.apache.commons.configuration.ConfigurationException;
 +import org.apache.commons.configuration.ConfigurationUtils;
 +import org.apache.commons.configuration.PropertiesConfiguration;
 +import org.apache.s4.base.Emitter;
 +import org.apache.s4.base.Hasher;
 +import org.apache.s4.base.Listener;
 +import org.apache.s4.base.SerializerDeserializer;
 +import org.apache.s4.comm.DefaultHasher;
 +import org.apache.s4.comm.serialize.KryoSerDeser;
 +import org.apache.s4.comm.tcp.TCPEmitter;
 +import org.apache.s4.comm.tcp.TCPListener;
 +import org.apache.s4.comm.topology.Assignment;
 +import org.apache.s4.comm.topology.AssignmentFromZK;
 +import org.apache.s4.comm.topology.Cluster;
- import org.apache.s4.comm.topology.Topology;
- import org.apache.s4.comm.topology.TopologyFromZK;
++import org.apache.s4.comm.topology.ClusterFromZK;
 +
 +import com.google.inject.AbstractModule;
 +import com.google.inject.Binder;
 +import com.google.inject.name.Names;
 +
 +public class TestModule extends AbstractModule {
 +
 +    private PropertiesConfiguration config;
 +
 +    private void loadProperties(Binder binder) {
 +
 +        try {
 +            InputStream is = this.getClass().getResourceAsStream("/org.apache.s4.deploy.s4.properties");
 +            config = new PropertiesConfiguration();
 +            config.load(is);
 +            System.out.println(ConfigurationUtils.toString(config));
 +            // TODO - validate properties.
 +
 +            /* Make all properties injectable. Do we need this? */
 +            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
 +        } catch (ConfigurationException e) {
 +            binder.addError(e);
 +            e.printStackTrace();
 +        }
 +    }
 +
 +    @Override
 +    protected void configure() {
 +        if (config == null) {
 +            loadProperties(binder());
 +        }
 +        bind(Cluster.class);
 +        bind(Hasher.class).to(DefaultHasher.class);
 +        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
 +        bind(Assignment.class).to(AssignmentFromZK.class);
-         bind(Topology.class).to(TopologyFromZK.class);
++        bind(Cluster.class).to(ClusterFromZK.class);
 +        bind(Emitter.class).to(TCPEmitter.class);
 +        bind(Listener.class).to(TCPListener.class);
 +
 +        bind(Integer.class).annotatedWith(Names.named("comm.retries")).toInstance(10);
 +        bind(Integer.class).annotatedWith(Names.named("comm.retry_delay")).toInstance(10);
 +        bind(Integer.class).annotatedWith(Names.named("comm.timeout")).toInstance(1000);
 +
 +        bind(Integer.class).annotatedWith(Names.named("tcp.partition.queue_size")).toInstance(256);
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-edsl/s4-edsl.gradle
----------------------------------------------------------------------
diff --cc subprojects/s4-edsl/s4-edsl.gradle
index 208c9d4,0000000..71f232e
mode 100644,000000..100644
--- a/subprojects/s4-edsl/s4-edsl.gradle
+++ b/subprojects/s4-edsl/s4-edsl.gradle
@@@ -1,51 -1,0 +1,51 @@@
 +// Use the Diezel Maven plugin to build the project.
 +// Gradle doesn't support Maven plugin as of version 1.0
 +// Useful article: http://forums.gradle.org/gradle/topics/how_to_download_and_evoke_a_maven_plugin
 +
 +def generatedSourceDir = "${buildDir}/generated-src/java"
 +def diezelSrcDir = "${projectDir}/src/main/diezel";
 +
 +dependencies {
 +    compile project(":s4-core")
-     testCompile project(path: ':s4-core', configuration: 'tests')
++    //testCompile project(path: ':s4-core', configuration: 'tests')
 +}
 +
 +
 +sourceSets {
 +    generated {
 +        java { srcDir generatedSourceDir }
 +    }
 +}
 +
 +buildscript {
 +    repositories {
 +        mavenCentral()
 +
 +        maven {
 +            url "http://oss.sonatype.org/content/repositories/snapshots"
 +        }
 +        maven {
 +            url "http://oss.sonatype.org/content/repositories/releases"
 +        }
 +    }
 +    dependencies {
 +        classpath libraries.diezel
 +    }
 +}
 +task generateSources << {
 +    outputs.dir generatedSourceDir
 +    def mojo = new net.ericaro.diezel.plugin.DiezelMojo()
 +    mojo.sourceDirectory = new File(diezelSrcDir);
 +    mojo.outputDirectory = new File(generatedSourceDir);
 +    mojo.staleMillis = 0;
 +    mojo.project = new org.apache.maven.project.MavenProject();
 +    org.apache.maven.model.Build build = new org.apache.maven.model.Build();
 +    build.setDirectory(mojo.sourceDirectory.getAbsolutePath());
 +    mojo.project.setBuild(build);
 +    mojo.execute();
 +}
 +
 +compileJava.source generateSources.outputs.files, sourceSets.main.java
 +
 +eclipseClasspath.dependsOn generateSources
 +

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java
----------------------------------------------------------------------
diff --cc subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java
index 181b0f8,0000000..e823101
mode 100644,000000..100644
--- a/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java
+++ b/subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java
@@@ -1,330 -1,0 +1,330 @@@
 +package org.apache.s4.edsl;
 +
 +import java.lang.reflect.Array;
 +import java.lang.reflect.Field;
 +import java.util.Collection;
 +import java.util.Iterator;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.apache.s4.base.Event;
 +import org.apache.s4.core.App;
 +import org.apache.s4.core.ProcessingElement;
 +import org.apache.s4.core.Stream;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.collect.LinkedListMultimap;
 +import com.google.common.collect.Multimap;
 +import com.google.common.collect.Sets;
 +
 +/**
 + * Implementation of the S4 embedded domain-specific language (EDSL).
 + * 
 + * <p>
 + * To write an app extend this class and define the application graph using a chain of methods as follows:
 + * 
 + * <pre>
 + *    final public class MyApp extends BuilderS4DSL {
 + * 
 + *     protected void onInit() {
 + * 
 + *         pe("Consumer").type(ConsumerPE.class).asSingleton().
 + *         pe("Producer").type(ProducerPE.class).timer().withPeriod(1, TimeUnit.MILLISECONDS).asSingleton().
 + *         emit(SomeEvent.class).withKey("someKey").to("Consumer").
 + *         build()
 + *    }
 + * </pre>
 + * 
 + * <p>
 + * A few things to notice:
 + * <ul>
 + * <li>Applications must extend class {@link BuilderS4DSL}
 + * <li>The graph definition is implemented in the {@link App#onInit} method which is called by the container when the
 + * application is loaded.
 + * <li>PEs are defined using strings because they need to be referenced by other parts of the graph. By doing this, we
 + * can create the whole application in a single chain of methods.
 + * <li>To assign target streams to PE fields additional information may need to be provided using the {@code onField}
 + * grammar token when there is an ambiguity. This will happen when a PE has more than one targetStream field with the
 + * same {@link Event} type. Use the construct {@code emit(SomeEvent.class).onField("streamFieldName")}. If the PE
 + * doesn't have a field named {@code "streamField"} whose stream parameter type is {@code someEvent)} then the parser
 + * will fail to build the app.
 + * <li>To configure a PE, set property values by chaining any number of {@code prop(name, value)} methods. The name
 + * should match a PE field and the value will be parsed using the type of that field.
 + * </ul>
 + * <p>
 + * Grammar:
 + * 
 + * <pre>
-  *  (pe , type , prop* , (fireOn , afterInterval? , afterNumEvents?)? , (timer, withPeriod)? , 
-  *  (cache, size , expires? )? , asSingleton? , (emit, onField?, 
++ *  (pe , type , prop* , (fireOn , afterInterval? , afterNumEvents?)? , (timer, withPeriod)? ,
++ *  (cache, size , expires? )? , asSingleton? , (emit, onField?,
 + *  (withKey|withKeyFinder)?, to )*  )+ , build
 + * </pre>
 + * 
 + * <p>
 + * See the <a href="http://code.google.com/p/diezel">Diezel</a> project for details.
 + * 
 + * @author Leo Neumeyer (@leoneu)
 + */
 +public class AppBuilder extends App {
 +
 +    protected App app = this;
 +
 +    static final Logger logger = LoggerFactory.getLogger(AppBuilder.class);
 +
 +    private Multimap<ProcessingElement, StreamBuilder<? extends Event>> pe2stream = LinkedListMultimap.create();
 +    Set<StreamBuilder<? extends Event>> streamBuilders = Sets.newHashSet();
 +
 +    /* Variables used to hold values from state to state. */
 +    ProcessingElement processingElement;
 +    String peName;
 +    Class<? extends Event> triggerEventType;
 +    long triggerInterval = 0;
 +    TimeUnit triggerTimeUnit;
 +    int cacheSize;
 +    StreamBuilder<? extends Event> streamBuilder;
 +    String propertyName, propertyValue;
 +
 +    public static AppBuilder getAppBuilder() {
 +        return new BuilderS4DSL();
 +    }
 +
 +    void addProperty(String name, String value) {
 +        propertyName = name;
 +        propertyValue = value;
 +        setField();
 +    }
 +
 +    void addPe2Stream(ProcessingElement pe, StreamBuilder<? extends Event> st) {
 +        pe2stream.put(pe, st);
 +    }
 +
 +    App buildApp() {
 +
 +        /* Stream to PE writing. */
 +        for (StreamBuilder<? extends Event> sb : streamBuilders) {
 +            for (String peName : sb.pes) {
 +                ProcessingElement pe = getPE(peName);
 +                sb.stream.setPE(pe);
 +            }
 +        }
 +
 +        /* PE to Stream wiring. */
 +        Map<ProcessingElement, Collection<StreamBuilder<? extends Event>>> pe2streamMap = pe2stream.asMap();
 +        for (Map.Entry<ProcessingElement, Collection<StreamBuilder<? extends Event>>> entry : pe2streamMap.entrySet()) {
 +            ProcessingElement pe = entry.getKey();
 +            Collection<StreamBuilder<? extends Event>> streams = entry.getValue();
 +
 +            if (pe != null && streams != null) {
 +                try {
 +                    setStreamField(pe, streams);
 +                } catch (Exception e) {
 +                    logger.error("Unable to build app.", e);
 +                    return null;
 +                }
 +            }
 +        }
 +
 +        return this;
 +    }
 +
 +    /**
 +     * @param peName
 +     *            the peName to set
 +     */
 +    protected void setPeName(String peName) {
 +        this.peName = peName;
 +    }
 +
 +    /*
 +     * Cannot create an abstract class in Diezel so for now, I just implement the abstract methods here. They need to be
 +     * overloaded by the app developer.
 +     */
 +    @Override
 +    protected void onStart() {
 +    }
 +
 +    @Override
 +    protected void onInit() {
 +    }
 +
 +    @Override
 +    protected void onClose() {
 +    }
 +
 +    private <T extends ProcessingElement> void setField() {
 +
 +        logger.debug("Adding property [{}] to PE of type [{}].", propertyName, processingElement.getClass().getName());
 +
 +        Class<? extends ProcessingElement> type = processingElement.getClass();
 +
 +        try {
 +            Field f = type.getDeclaredField(propertyName);
 +            f.setAccessible(true);
 +            logger.trace("Type: {}.", f.getType());
 +            logger.trace("GenericType: {}.", f.getGenericType());
 +
 +            /* Set the field. */
 +            if (f.getType().getCanonicalName() == "long") {
 +                f.setLong(processingElement, Long.parseLong(propertyValue));
 +                return;
 +            } else if (f.getType().getCanonicalName() == "int") {
 +                f.setInt(processingElement, Integer.parseInt(propertyValue));
 +                return;
 +            } else if (f.getType().getCanonicalName() == "float") {
 +                f.setFloat(processingElement, Float.parseFloat(propertyValue));
 +                return;
 +            } else if (f.getType().getCanonicalName() == "double") {
 +                f.setDouble(processingElement, Double.parseDouble(propertyValue));
 +                return;
 +            } else if (f.getType().getCanonicalName() == "short") {
 +                f.setShort(processingElement, Short.parseShort(propertyValue));
 +                return;
 +            } else if (f.getType().getCanonicalName() == "byte") {
 +                f.setByte(processingElement, Byte.parseByte(propertyValue));
 +                return;
 +            } else if (f.getType().getCanonicalName() == "boolean") {
 +                f.setBoolean(processingElement, Boolean.parseBoolean(propertyValue));
 +                return;
 +            } else if (f.getType().getCanonicalName() == "char") {
 +                f.setChar(processingElement, (char) Byte.parseByte(propertyValue));
 +                return;
 +            } else if (f.getType().getCanonicalName() == "java.lang.String") {
 +                f.set(processingElement, propertyValue);
 +                return;
 +            }
 +
 +            logger.error("Unable to set field named [{}] in PE of type [{}].", propertyName, type);
 +            throw new IllegalArgumentException();
 +
 +            // production code should handle these exceptions more gracefully
 +        } catch (NoSuchFieldException e) {
 +            logger.error("There is no field named [{}] in PE of type [{}].", propertyName, type);
 +        } catch (Exception e) {
 +            logger.error("Couldn't set value for field [{}] in PE of type [{}].", propertyName, type);
 +        }
 +    }
 +
 +    /* Set the stream fields in PE classes. Infer the field by checking the stream parameter type <? extends Event>. */
 +    private void setStreamField(ProcessingElement pe, Collection<StreamBuilder<? extends Event>> streams)
 +            throws Exception {
 +
 +        /*
 +         * Create a map of the stream fields to the corresponding generic type. We will use this info to assign the
 +         * streams. If the field type matches the stream type and there is no ambiguity, then the assignment is easy. If
 +         * more than one field has the same type, then then we need to do more work.
 +         */
 +        Field[] fields = pe.getClass().getDeclaredFields();
 +        Multimap<String, Field> typeMap = LinkedListMultimap.create();
 +        logger.debug("Analyzing PE [{}].", pe.getClass().getName());
 +        for (Field field : fields) {
 +            logger.trace("Field [{}] is of generic type [{}].", field.getName(), field.getGenericType());
 +
 +            if (field.getType() == Stream[].class) {
 +                logger.debug("Found stream field: {}", field.getGenericType());
 +
 +                /* Track what fields have streams with the same event type. */
 +                String key = field.getGenericType().toString();
 +                typeMap.put(key, field);
 +            }
 +        }
 +
 +        /* Assign streams to stream fields. */
 +        Multimap<Field, Stream<? extends Event>> assignment = LinkedListMultimap.create();
 +        for (StreamBuilder<? extends Event> sm : streams) {
 +
 +            Stream<? extends Event> stream = sm.stream;
 +            Class<? extends Event> eventType = sm.type;
 +            String key = Stream.class.getCanonicalName() + "<" + eventType.getCanonicalName() + ">[]";
 +            if (typeMap.containsKey(key)) {
 +                String fieldName;
 +                Field field;
 +                Collection<Field> streamFields = typeMap.get(key);
 +                int numStreamFields = streamFields.size();
 +                logger.debug("Found [{}] stream fields for type [{}].", numStreamFields, key);
 +
 +                if (numStreamFields > 1) {
 +
 +                    /*
 +                     * There is more than one field that can be used for this stream type. To resolve the ambiguity we
 +                     * need additional information. The app graph should include the name of the field that should be
 +                     * used to assign this stream. If the name is missing we bail out.
 +                     */
 +                    fieldName = sm.fieldName;
 +
 +                    /* Bail out. */
 +                    if (fieldName == null) {
 +                        String msg = String
 +                                .format("There are [%d] stream fields in PE [%s]. To assign stream [%s] you need to provide the field name in the application graph using the method withFiled(). See Javadocs for an example.",
 +                                        numStreamFields, pe.getClass().getName(), stream.getName());
 +                        logger.error(msg);
 +                        throw new Exception(msg);
 +                    }
 +
 +                    /* Use the provided field name to choose the PE field. */
 +                    field = pe.getClass().getDeclaredField(fieldName);
 +
 +                } else {
 +
 +                    /*
 +                     * The easy case, no ambiguity, we don't need an explicit field name to be provided. We have the
 +                     * field that matches the stream type.
 +                     */
 +                    Iterator<Field> iter = streamFields.iterator();
 +                    field = iter.next(); // Note that numStreamFields == 1, the size of this collection is 1.
 +                    logger.debug("Using field [{}].", field.getName());
 +                }
 +
 +                /*
 +                 * By now, we found the field to use for this stream or we bailed out. We are not ready to finish yet.
 +                 * There may be more than one stream that needs to be assigned to this field. The stream fields must be
 +                 * arrays by convention and there may be more than one stream assigned to this fields. For now we create
 +                 * a multimap from field to streams so we can construct the array in the next step.
 +                 */
 +                assignment.put(field, stream);
 +
 +            } else {
 +
 +                /* We couldn't find a match. Tell user to fix the EDSL code. */
 +                String msg = String.format(
 +                        "There is no stream of type [%s] in PE [%s]. I was unable to assign stream [%s].", key, pe
 +                                .getClass().getName(), stream.getName());
 +                logger.error(msg);
 +                throw new Exception(msg);
 +
 +            }
 +        }
 +        /* Now we construct the array and do the final assignment. */
 +
 +        Map<Field, Collection<Stream<? extends Event>>> assignmentMap = assignment.asMap();
 +        for (Map.Entry<Field, Collection<Stream<? extends Event>>> entry : assignmentMap.entrySet()) {
 +            Field f = entry.getKey();
 +
 +            int arraySize = entry.getValue().size();
 +            @SuppressWarnings("unchecked")
 +            Stream<? extends Event> streamArray[] = (Stream<? extends Event>[]) Array.newInstance(Stream.class,
 +                    arraySize);
 +            int i = 0;
 +            for (Stream<? extends Event> s : entry.getValue()) {
 +                streamArray[i++] = s;
 +
 +                f.setAccessible(true);
 +                f.set(pe, streamArray);
 +                logger.debug("Assigned [{}] streams to field [{}].", streamArray.length, f.getName());
 +            }
 +        }
 +    }
 +
 +    void clearPEState() {
 +        propertyName = null;
 +        propertyValue = null;
 +        processingElement = null;
 +        peName = null;
 +        triggerEventType = null;
 +        triggerTimeUnit = null;
 +        cacheSize = -1;
 +        streamBuilder = null;
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java
----------------------------------------------------------------------
diff --cc subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java
index 75fea88,0000000..ad40e86
mode 100644,000000..100644
--- a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/MyApp.java
@@@ -1,37 -1,0 +1,24 @@@
 +package org.apache.s4.edsl;
 +
 +import java.util.concurrent.TimeUnit;
 +
 +public class MyApp extends BuilderS4DSL {
 +
 +    @Override
 +    public void onInit() {
 +
 +        pe("PEZ").type(PEZ.class).fireOn(EventA.class).afterInterval(5, TimeUnit.SECONDS).cache().size(1000)
 +                .expires(3, TimeUnit.HOURS).emit(EventB.class).to("PEX").
 +
 +                pe("PEY").type(PEY.class).prop("duration", "4").prop("height", "99").timer()
 +                .withPeriod(2, TimeUnit.MINUTES).emit(EventA.class).onField("stream3")
 +                .withKeyFinder(DurationKeyFinder.class).to("PEZ").emit(EventA.class).onField("heightpez")
 +                .withKeyFinder(HeightKeyFinder.class).to("PEZ").
 +
 +                pe("PEX").type(PEX.class).prop("query", "money").cache().size(100).expires(1, TimeUnit.MINUTES)
 +                .asSingleton().emit(EventB.class).withKeyFinder(QueryKeyFinder.class).to("PEY", "PEZ").
 +
 +                build();
 +    }
 +
-     // Make hooks public for testing. Normally this is handled by the container.
-     public void init() {
-         super.init();
-     }
- 
-     public void start() {
-         super.start();
-     }
- 
-     public void close() {
-         super.close();
-     }
- 
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
----------------------------------------------------------------------
diff --cc subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
index dbff5f5,0000000..8225ee4
mode 100644,000000..100644
--- a/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
+++ b/subprojects/s4-edsl/src/test/java/org/apache/s4/edsl/TestEDSL.java
@@@ -1,38 -1,0 +1,46 @@@
 +package org.apache.s4.edsl;
 +
 +import java.lang.reflect.Field;
 +
++import org.apache.s4.comm.DefaultCommModule;
++import org.apache.s4.core.DefaultCoreModule;
++import org.apache.s4.fixtures.ZkBasedTest;
 +import org.junit.Test;
 +
++import com.google.common.io.Resources;
 +import com.google.inject.Guice;
 +import com.google.inject.Injector;
 +
- public class TestEDSL {
++public class TestEDSL extends ZkBasedTest {
++
++    public final static String CLUSTER_NAME = "cluster1";
 +
 +    @Test
 +    public void test() throws Exception {
-         Injector injector = Guice.createInjector(new Module());
++        Injector injector = Guice.createInjector(
++                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), CLUSTER_NAME),
++                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
 +        MyApp myApp = injector.getInstance(MyApp.class);
 +
 +        /* Normally. the container will handle this but this is just a test. */
 +        myApp.init();
 +        myApp.start();
 +        myApp.close();
 +    }
 +
 +    @Test
 +    public void testReflection() {
 +
 +        try {
 +            Class<?> c = PEY.class;
 +            Field f = c.getDeclaredField("duration");
 +            System.out.format("Type: %s%n", f.getType());
 +            System.out.format("GenericType: %s%n", f.getGenericType());
 +
 +            // production code should handle these exceptions more gracefully
 +        } catch (NoSuchFieldException x) {
 +            x.printStackTrace();
 +        }
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java
----------------------------------------------------------------------
diff --cc subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java
index 2019a1e,0000000..ef5cd94
mode 100644,000000..100644
--- a/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java
+++ b/subprojects/s4-example/src/main/java/org/apache/s4/example/edsl/counter/CounterApp.java
@@@ -1,99 -1,0 +1,100 @@@
- package org.apache.s4.example.edsl.counter;
- 
- import java.util.concurrent.TimeUnit;
- 
- import org.apache.s4.base.Event;
- import org.apache.s4.edsl.BuilderS4DSL;
- 
- import com.google.inject.Guice;
- import com.google.inject.Injector;
- 
- /**
-  * This is a sample application to test the S4 embedded domain-specific language (EDSL).
-  * 
-  * <p>
-  * Grammar:
-  * 
-  * <pre>
-  *  (pe , type , prop* , (fireOn , afterInterval? , afterNumEvents?)? , (timer, withPeriod)? , 
-  *  (cache, size , expires? )? , asSingleton? , (emit, onField?, 
-  *  (withKey|withKeyFinder)?, to )*  )+ , build
-  * </pre>
-  * 
-  * <p>
-  * See the <a href="http://code.google.com/p/diezel">Diezel</a> project for details.
-  * 
-  */
- final public class CounterApp extends BuilderS4DSL {
- 
-     public static void main(String[] args) {
-         Injector injector = Guice.createInjector(new Module());
-         CounterApp myApp = injector.getInstance(CounterApp.class);
- 
-         /* Normally. the container will handle this but this is just a test. */
-         myApp.init();
-         myApp.start();
- 
-         try {
-             Thread.sleep(1000);
-         } catch (InterruptedException e) {
-             e.printStackTrace();
-         }
-         myApp.close();
-     }
- 
-     @Override
-     protected void onInit() {
- 
-         pe("Print").type(PrintPE.class).asSingleton().
- 
-         pe("User Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
-                 .emit(CountEvent.class).withKeyFinder(CountKeyFinder.class).to("Print").
- 
-                 pe("Gender Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
-                 .emit(CountEvent.class).withKeyFinder(CountKeyFinder.class).to("Print").
- 
-                 pe("Age Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
-                 .emit(CountEvent.class).withKeyFinder(CountKeyFinder.class).to("Print").
- 
-                 pe("Generate User Event").type(GenerateUserEventPE.class).timer().withPeriod(1, TimeUnit.MILLISECONDS)
-                 .asSingleton().
- 
-                 emit(UserEvent.class).withKeyFinder(UserIDKeyFinder.class).to("User Count").
- 
-                 emit(UserEvent.class).withKey("gender").to("Gender Count").
- 
-                 emit(UserEvent.class).withKeyFinder(AgeKeyFinder.class).to("Age Count").
- 
-                 build();
-     }
- 
-     /*
-      * Create and send 200 dummy events of type UserEvent.
-      * 
-      * @see io.s4.App#start()
-      */
-     @Override
-     protected void onStart() {
- 
-     }
- 
-     @Override
-     protected void onClose() {
-         System.out.println("Bye.");
-     }
- 
-     // Make hooks public for testing. Normally this is handled by the container.
-     public void init() {
-         super.init();
-     }
- 
-     public void start() {
-         super.start();
-     }
- 
-     public void close() {
-         super.close();
-     }
- 
- }
++// NOTE: this is commented until we fix the dependency to the classes generated by the edsl subproject
++
++//package org.apache.s4.example.edsl.counter;
++//
++//import java.util.concurrent.TimeUnit;
++//
++//import org.apache.s4.base.Event;
++//
++//import com.google.inject.Guice;
++//import com.google.inject.Injector;
++//
++///**
++// * This is a sample application to test the S4 embedded domain-specific language (EDSL).
++// *
++// * <p>
++// * Grammar:
++// *
++// * <pre>
++// *  (pe , type , prop* , (fireOn , afterInterval? , afterNumEvents?)? , (timer, withPeriod)? ,
++// *  (cache, size , expires? )? , asSingleton? , (emit, onField?,
++// *  (withKey|withKeyFinder)?, to )*  )+ , build
++// * </pre>
++// *
++// * <p>
++// * See the <a href="http://code.google.com/p/diezel">Diezel</a> project for details.
++// *
++// */
++//final public class CounterApp extends BuilderS4DSL {
++//
++//    public static void main(String[] args) {
++//        Injector injector = Guice.createInjector(new Module());
++//        CounterApp myApp = injector.getInstance(CounterApp.class);
++//
++//        /* Normally. the container will handle this but this is just a test. */
++//        myApp.init();
++//        myApp.start();
++//
++//        try {
++//            Thread.sleep(1000);
++//        } catch (InterruptedException e) {
++//            e.printStackTrace();
++//        }
++//        myApp.close();
++//    }
++//
++//    @Override
++//    protected void onInit() {
++//
++//        pe("Print").type(PrintPE.class).asSingleton().
++//
++//        pe("User Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
++//                .emit(CountEvent.class).withKeyFinder(CountKeyFinder.class).to("Print").
++//
++//                pe("Gender Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
++//                .emit(CountEvent.class).withKeyFinder(CountKeyFinder.class).to("Print").
++//
++//                pe("Age Count").type(CounterPE.class).fireOn(Event.class).afterInterval(100, TimeUnit.MILLISECONDS)
++//                .emit(CountEvent.class).withKeyFinder(CountKeyFinder.class).to("Print").
++//
++//                pe("Generate User Event").type(GenerateUserEventPE.class).timer().withPeriod(1, TimeUnit.MILLISECONDS)
++//                .asSingleton().
++//
++//                emit(UserEvent.class).withKeyFinder(UserIDKeyFinder.class).to("User Count").
++//
++//                emit(UserEvent.class).withKey("gender").to("Gender Count").
++//
++//                emit(UserEvent.class).withKeyFinder(AgeKeyFinder.class).to("Age Count").
++//
++//                build();
++//    }
++//
++//    /*
++//     * Create and send 200 dummy events of type UserEvent.
++//     *
++//     * @see io.s4.App#start()
++//     */
++//    @Override
++//    protected void onStart() {
++//
++//    }
++//
++//    @Override
++//    protected void onClose() {
++//        System.out.println("Bye.");
++//    }
++//
++//    // Make hooks public for testing. Normally this is handled by the container.
++//    public void init() {
++//        super.init();
++//    }
++//
++//    public void start() {
++//        super.start();
++//    }
++//
++//    public void close() {
++//        super.close();
++//    }
++//
++// }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/121f3306/test-apps/twitter-counter/src/main/resources/default.s4.properties
----------------------------------------------------------------------
diff --cc test-apps/twitter-counter/src/main/resources/default.s4.properties
index 0000000,d5da3f3..16fc4ba
mode 000000,100644..100644
--- a/test-apps/twitter-counter/src/main/resources/default.s4.properties
+++ b/test-apps/twitter-counter/src/main/resources/default.s4.properties
@@@ -1,0 -1,15 +1,19 @@@
+ comm.queue_emmiter_size = 8000
+ comm.queue_listener_size = 8000
++comm.retries=10
++comm.retry_delay=10
++comm.timeout=1000
++tcp.partition.queue_size=256
+ cluster.hosts = localhost
+ cluster.ports = 5077
+ cluster.name = s4-adapter-cluster
+ cluster.zk_address = localhost:21810
+ cluster.zk_session_timeout = 10000
+ cluster.zk_connection_timeout = 10000
+ comm.module = org.apache.s4.core.CustomModule
+ s4.logger_level = DEBUG
+ appsDir=/tmp/deploy-test
+ tcp.partition.queue_size=1000
+ comm.timeout=100
+ comm.retry_delay=100
+ comm.retries=10