You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/04/12 08:46:04 UTC
[2/2] git commit: fixed windowing PE initialization - + minor fix in
tcp emitter for availability of nodes
fixed windowing PE initialization
- + minor fix in tcp emitter for availability of nodes
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/454450c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/454450c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/454450c7
Branch: refs/heads/S4-22
Commit: 454450c73126f116dd081f5100d54b521bc9a54e
Parents: 593d04b
Author: Matthieu Morel <mm...@apache.org>
Authored: Tue Apr 10 19:51:56 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Apr 10 19:51:56 2012 +0200
----------------------------------------------------------------------
.../java/org/apache/s4/comm/tcp/TCPEmitter.java | 4 ++
.../src/main/java/org/apache/s4/core/App.java | 35 +++++++++---
.../java/org/apache/s4/core/ProcessingElement.java | 43 +++++++++------
.../src/main/java/org/apache/s4/core/Server.java | 6 ++-
.../main/java/org/apache/s4/core/WindowingPE.java | 21 +++-----
5 files changed, 67 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/454450c7/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 25a0470..90b4686 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -131,6 +131,10 @@ public class TCPEmitter implements Emitter, ChannelFutureListener, TopologyChang
ClusterNode clusterNode = partitionNodeMap.get(partitionId);
if (clusterNode == null) {
+ if (topology.getTopology().getNodes().size() == 0) {
+ logger.error("No node in cluster ");
+ return false;
+ }
clusterNode = topology.getTopology().getNodes().get(partitionId);
partitionNodeMap.forcePut(partitionId, clusterNode);
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/454450c7/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 9f74b39..d33e718 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -1,24 +1,23 @@
/*
* Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific
* language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
+ * License. See accompanying LICENSE file.
*/
package org.apache.s4.core;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.s4.base.Event;
@@ -28,14 +27,13 @@ import org.apache.s4.comm.serialize.KryoSerDeser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
/*
- * Container base class to hold all processing elements. We will implement administrative methods here.
+ * Container base class to hold all processing elements. We will implement administrative methods here.
*/
public abstract class App {
@@ -312,15 +310,34 @@ public abstract class App {
try {
// TODO: make sure this doesn't crash if PE has a constructor other than with App as argument.
Class<?>[] types = new Class<?>[] { App.class };
- T pe = type.getDeclaredConstructor(types).newInstance(this);
- return pe;
-
+ try {
+ T pe = type.getDeclaredConstructor(types).newInstance(this);
+ return pe;
+ } catch (NoSuchMethodException e) {
+ // no such constructor. Use the setter
+ T pe = type.getDeclaredConstructor(new Class[] {}).newInstance();
+ pe.setApp(this);
+ return pe;
+ }
} catch (Exception e) {
logger.error(e.getMessage(), e);
return null;
}
}
+ public <T extends WindowingPE<?>> T createWindowingPE(Class<T> type, long slotDuration, TimeUnit timeUnit,
+ int numSlots) {
+ try {
+ Class<?>[] types = new Class<?>[] { App.class, long.class, TimeUnit.class, int.class };
+ T pe = type.getDeclaredConstructor(types).newInstance(
+ new Object[] { this, slotDuration, timeUnit, numSlots });
+ return pe;
+ } catch (Exception e) {
+ logger.error("Cannot instantiate pe for class [{}]", type.getName(), e);
+ return null;
+ }
+ }
+
static private String toString(ProcessingElement pe) {
return pe != null ? pe.getClass().getName() + " " : "null ";
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/454450c7/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index ee4c72f..99d6f1f 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -1,17 +1,17 @@
/*
* Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific
* language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
+ * License. See accompanying LICENSE file.
*/
package org.apache.s4.core;
@@ -80,9 +80,9 @@ import com.google.common.collect.Maps;
* public class MyPE extends ProcessingElement {
*
* private Map<String, Integer> wordCount;
- *
+ *
* ...
- *
+ *
* onCreate() {
* wordCount = new HashMap<String, Integer>;
* logger.trace("Created a map for instance PE with id {}, getId());
@@ -130,15 +130,6 @@ public abstract class ProcessingElement implements Cloneable {
private transient OverloadDispatcher overloadDispatcher;
protected ProcessingElement() {
- }
-
- /**
- * Create a PE prototype. By default, PE instances will never expire. Use {@code #configurePECache} to configure.
- *
- * @param app
- * the app that contains this PE
- */
- public ProcessingElement(App app) {
OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(this.getClass());
Class<?> overloadDispatcherClass = oldg.generate();
try {
@@ -146,10 +137,6 @@ public abstract class ProcessingElement implements Cloneable {
} catch (Exception e) {
throw new RuntimeException(e);
}
-
- this.app = app;
- app.addPEPrototype(this, null);
-
peInstances = CacheBuilder.newBuilder().build(new CacheLoader<String, ProcessingElement>() {
@Override
public ProcessingElement load(String key) throws Exception {
@@ -167,6 +154,17 @@ public abstract class ProcessingElement implements Cloneable {
}
/**
+ * Create a PE prototype. By default, PE instances will never expire. Use {@code #configurePECache} to configure.
+ *
+ * @param app
+ * the app that contains this PE
+ */
+ public ProcessingElement(App app) {
+ this();
+ setApp(app);
+ }
+
+ /**
* This method is called by the PE timer. By default it is synchronized with the {@link #onEvent()} and
* {@link #onTrigger()} methods. To execute concurrently with other methods, the {@link ProcessingElelment} subclass
* must be annotated with {@link @ThreadSafe}.
@@ -197,6 +195,15 @@ public abstract class ProcessingElement implements Cloneable {
return app;
}
+ public void setApp(App app) {
+ if (this.app != null) {
+ throw new RuntimeException("Application was already assigne to this processing element");
+ }
+ this.app = app;
+ app.addPEPrototype(this, null);
+
+ }
+
/**
* Returns the approximate number of PE instances from the cache.
*
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/454450c7/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
index ff07e44..d148868 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java
@@ -91,6 +91,11 @@ public class Server {
// /* After some indirection we get the injector. */
// injector = Guice.createInjector(module);
+ if (!new File(appsDir).exists()) {
+ if (!new File(appsDir).mkdirs()) {
+ logger.error("Cannot create apps directory [{}]", appsDir);
+ }
+ }
File[] s4rFiles = new File(appsDir).listFiles(new PatternFilenameFilter("\\w+\\.s4r"));
for (File s4rFile : s4rFiles) {
loadApp(s4rFile);
@@ -202,7 +207,6 @@ public class Server {
return null;
}
-
App previous = apps.put(appName, app);
logger.info("Loaded application from file {}", s4r.getAbsolutePath());
signalOneAppLoaded.countDown();
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/454450c7/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
index 4c3e931..5418b83 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/WindowingPE.java
@@ -1,18 +1,18 @@
/*
* Copyright (c) 2011 The S4 Project, http://s4.io.
* All rights reserved.
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific
* language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
+ * License. See accompanying LICENSE file.
*/
package org.apache.s4.core;
@@ -23,7 +23,6 @@ import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections15.buffer.CircularFifoBuffer;
-import org.apache.s4.base.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,14 +85,6 @@ public abstract class WindowingPE<T> extends ProcessingElement {
this(app, 0l, null, numSlots);
}
- abstract protected void onEvent(Event event);
-
- abstract public void onTrigger(Event event);
-
- abstract public void onCreate();
-
- abstract public void onRemove();
-
/**
* This method is called at periodic intervals when a new slot must be put into the buffer. The concrete class must
* implement the logic required to create a slot. For example, compute statistics from aggregations and get
@@ -118,10 +109,13 @@ public abstract class WindowingPE<T> extends ProcessingElement {
logger.error("Calling method addSlot() in a periodic window is not allowed.");
return;
}
+ circularBuffer.add(slot);
+ }
+
+ protected void onCreate() {
if (circularBuffer == null) {
circularBuffer = new CircularFifoBuffer<T>(numSlots);
}
- circularBuffer.add(slot);
}
/**
@@ -143,7 +137,6 @@ public abstract class WindowingPE<T> extends ProcessingElement {
* @return the collection of slots
*/
protected Collection<T> getSlots() {
-
return circularBuffer;
}