You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/04/12 08:46:04 UTC

[2/2] git commit: fixed windowing PE initialization - + minor fix in tcp emitter for availability of nodes

fixed windowing PE initialization
- + minor fix in tcp emitter for availability of nodes


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/454450c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/454450c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/454450c7

Branch: refs/heads/S4-22
Commit: 454450c73126f116dd081f5100d54b521bc9a54e
Parents: 593d04b
Author: Matthieu Morel <mm...@apache.org>
Authored: Tue Apr 10 19:51:56 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Apr 10 19:51:56 2012 +0200

----------------------------------------------------------------------
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |    4 ++
 .../src/main/java/org/apache/s4/core/App.java      |   35 +++++++++---
 .../java/org/apache/s4/core/ProcessingElement.java |   43 +++++++++------
 .../src/main/java/org/apache/s4/core/Server.java   |    6 ++-
 .../main/java/org/apache/s4/core/WindowingPE.java  |   21 +++-----
 5 files changed, 67 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/454450c7/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 25a0470..90b4686 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -131,6 +131,10 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
         ClusterNode clusterNode = partitionNodeMap.get(partitionId);
 
         if (clusterNode == null) {
+            if (topology.getTopology().getNodes().size() == 0) {
+                logger.error("No node in cluster ");
+                return false;
+            }
             clusterNode = topology.getTopology().getNodes().get(partitionId);
             partitionNodeMap.forcePut(partitionId, clusterNode);
         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/454450c7/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 9f74b39..d33e718 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -1,24 +1,23 @@
 /*
  * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *          http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
  * either express or implied. See the License for the specific
  * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
+ * License. See accompanying LICENSE file.
  */
 package org.apache.s4.core;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.s4.base.Event;
@@ -28,14 +27,13 @@ import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 
 /*
- * Container base class to hold all processing elements. We will implement administrative methods here. 
+ * Container base class to hold all processing elements. We will implement administrative methods here.
  */
 public abstract class App {
 
@@ -312,15 +310,34 @@ public abstract class App {
         try {
             // TODO: make sure this doesn't crash if PE has a constructor other than with App as argument.
             Class<?>[] types = new Class<?>[] { App.class };
-            T pe = type.getDeclaredConstructor(types).newInstance(this);
-            return pe;
-
+            try {
+                T pe = type.getDeclaredConstructor(types).newInstance(this);
+                return pe;
+            } catch (NoSuchMethodException e) {
+                // no such constructor. Use the setter
+                T pe = type.getDeclaredConstructor(new Class[] {}).newInstance();
+                pe.setApp(this);
+                return pe;
+            }
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
             return null;
         }
     }
 
+    public <T extends WindowingPE<?>> T createWindowingPE(Class<T> type, long slotDuration, TimeUnit timeUnit,
+            int numSlots) {
+        try {
+            Class<?>[] types = new Class<?>[] { App.class, long.class, TimeUnit.class, int.class };
+            T pe = type.getDeclaredConstructor(types).newInstance(
+                    new Object[] { this, slotDuration, timeUnit, numSlots });
+            return pe;
+        } catch (Exception e) {
+            logger.error("Cannot instantiate pe for class [{}]", type.getName(), e);
+            return null;
+        }
+    }
+
     static private String toString(ProcessingElement pe) {
         return pe != null ? pe.getClass().getName() + " " : "null ";
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/454450c7/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index ee4c72f..99d6f1f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -1,17 +1,17 @@
 /*
  * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- * 
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *          http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
  * either express or implied. See the License for the specific
  * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
+ * License. See accompanying LICENSE file.
  */
 package org.apache.s4.core;
 
@@ -80,9 +80,9 @@ import com.google.common.collect.Maps;
  *         public class MyPE extends ProcessingElement {
  * 
  *           private Map<String, Integer> wordCount;
- *         
+ * 
  *           ...
- *         
+ * 
  *           onCreate() {
  *           wordCount = new HashMap<String, Integer>;
  *           logger.trace("Created a map for instance PE with id {}, getId());
@@ -130,15 +130,6 @@ public abstract class ProcessingElement implements Cloneable {
     private transient OverloadDispatcher overloadDispatcher;
 
     protected ProcessingElement() {
-    }
-
-    /**
-     * Create a PE prototype. By default, PE instances will never expire. Use {@code #configurePECache} to configure.
-     * 
-     * @param app
-     *            the app that contains this PE
-     */
-    public ProcessingElement(App app) {
         OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(this.getClass());
         Class<?> overloadDispatcherClass = oldg.generate();
         try {
@@ -146,10 +137,6 @@ public abstract class ProcessingElement implements Cloneable {
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
-
-        this.app = app;
-        app.addPEPrototype(this, null);
-
         peInstances = CacheBuilder.newBuilder().build(new CacheLoader<String, ProcessingElement>() {
             @Override
             public ProcessingElement load(String key) throws Exception {
@@ -167,6 +154,17 @@ public abstract class ProcessingElement implements Cloneable {
     }
 
     /**
+     * Create a PE prototype. By default, PE instances will never expire. Use {@code #configurePECache} to configure.
+     * 
+     * @param app
+     *            the app that contains this PE
+     */
+    public ProcessingElement(App app) {
+        this();
+        setApp(app);
+    }
+
+    /**
      * This method is called by the PE timer. By default it is synchronized with the {@link #onEvent()} and
      * {@link #onTrigger()} methods. To execute concurrently with other methods, the {@link ProcessingElelment} subclass
      * must be annotated with {@link @ThreadSafe}.
@@ -197,6 +195,15 @@ public abstract class ProcessingElement implements Cloneable {
         return app;
     }
 
+    public void setApp(App app) {
+        if (this.app != null) {
+            throw new RuntimeException("Application was already assigne to this processing element");
+        }
+        this.app = app;
+        app.addPEPrototype(this, null);
+
+    }
+
     /**
      * Returns the approximate number of PE instances from the cache.
      * 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/454450c7/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index ff07e44..d148868 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -91,6 +91,11 @@ public class Server {
         // /* After some indirection we get the injector. */
         // injector = Guice.createInjector(module);
 
+        if (!new File(appsDir).exists()) {
+            if (!new File(appsDir).mkdirs()) {
+                logger.error("Cannot create apps directory [{}]", appsDir);
+            }
+        }
         File[] s4rFiles = new File(appsDir).listFiles(new PatternFilenameFilter("\\w+\\.s4r"));
         for (File s4rFile : s4rFiles) {
             loadApp(s4rFile);
@@ -202,7 +207,6 @@ public class Server {
                 return null;
             }
 
-
             App previous = apps.put(appName, app);
             logger.info("Loaded application from file {}", s4r.getAbsolutePath());
             signalOneAppLoaded.countDown();

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/454450c7/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
index 4c3e931..5418b83 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
@@ -1,18 +1,18 @@
 /*
  * Copyright (c) 2011 The S4 Project, http://s4.io.
  * All rights reserved.
- * 
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  *          http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
  * either express or implied. See the License for the specific
  * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
+ * License. See accompanying LICENSE file.
  */
 package org.apache.s4.core;
 
@@ -23,7 +23,6 @@ import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.collections15.buffer.CircularFifoBuffer;
-import org.apache.s4.base.Event;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,14 +85,6 @@ public abstract class WindowingPE<T> extends ProcessingElement {
         this(app, 0l, null, numSlots);
     }
 
-    abstract protected void onEvent(Event event);
-
-    abstract public void onTrigger(Event event);
-
-    abstract public void onCreate();
-
-    abstract public void onRemove();
-
     /**
      * This method is called at periodic intervals when a new slot must be put into the buffer. The concrete class must
      * implement the logic required to create a slot. For example, compute statistics from aggregations and get
@@ -118,10 +109,13 @@ public abstract class WindowingPE<T> extends ProcessingElement {
             logger.error("Calling method addSlot() in a periodic window is not allowed.");
             return;
         }
+        circularBuffer.add(slot);
+    }
+
+    protected void onCreate() {
         if (circularBuffer == null) {
             circularBuffer = new CircularFifoBuffer<T>(numSlots);
         }
-        circularBuffer.add(slot);
     }
 
     /**
@@ -143,7 +137,6 @@ public abstract class WindowingPE<T> extends ProcessingElement {
      * @return the collection of slots
      */
     protected Collection<T> getSlots() {
-
         return circularBuffer;
     }