You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/01 22:40:46 UTC
[15/24] git commit: remove other code and only multilang metircs
remove other code and only multilang metircs
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/a4b26af6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/a4b26af6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/a4b26af6
Branch: refs/heads/master
Commit: a4b26af6044b3774fec6224a74b8b3f2b994d535
Parents: 22a6ca9
Author: JuDasheng <ju...@meituan.com>
Authored: Fri Jun 13 11:49:42 2014 +0800
Committer: JuDasheng <ju...@meituan.com>
Committed: Fri Jun 13 11:49:42 2014 +0800
----------------------------------------------------------------------
build.sh | 9 -
examples/storm-starter/pom.xml | 2 +-
external/storm-kafka/pom.xml | 2 +-
patch/STORM-132_PULL-36.patch | 31 --
pom.xml | 20 +-
.../maven-shade-clojure-transformer/pom.xml | 4 +-
storm-core/dependency-reduced-pom.xml | 359 -------------------
storm-core/pom.xml | 4 -
.../src/clj/backtype/storm/daemon/executor.clj | 67 +---
.../clj/backtype/storm/daemon/supervisor.clj | 5 +-
.../src/clj/backtype/storm/daemon/task.clj | 4 +-
.../backtype/storm/scheduler/EvenScheduler.clj | 7 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 74 ++--
storm-core/src/jvm/backtype/storm/Config.java | 6 -
storm-core/src/multilang/py/storm.py | 17 +-
storm-core/src/ui/public/topology.html | 6 +-
storm-dist/binary/pom.xml | 4 -
17 files changed, 72 insertions(+), 549 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/build.sh
----------------------------------------------------------------------
diff --git a/build.sh b/build.sh
deleted file mode 100755
index 2563c33..0000000
--- a/build.sh
+++ /dev/null
@@ -1,9 +0,0 @@
-#!/bin/bash
-
-mvn clean install -DskipTests=true
-
-cd storm-dist/binary
-
-mvn package
-
-cd -
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index b56aa92..903c6e7 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -20,7 +20,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.2-incubating-mt0000</version>
+ <version>0.9.3-incubating-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index b7aaccc..4972619 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.2-incubating-mt0000</version>
+ <version>0.9.3-incubating-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/patch/STORM-132_PULL-36.patch
----------------------------------------------------------------------
diff --git a/patch/STORM-132_PULL-36.patch b/patch/STORM-132_PULL-36.patch
deleted file mode 100644
index 005ac26..0000000
--- a/patch/STORM-132_PULL-36.patch
+++ /dev/null
@@ -1,31 +0,0 @@
-From 6b275d95fbdbc8374a215ecb2551f0fca3438d81 Mon Sep 17 00:00:00 2001
-From: Kang Xiao <kx...@gmail.com>
-Date: Tue, 18 Feb 2014 23:23:50 +0800
-Subject: [PATCH] STORM-132 sort supervisor by free slot in desending order in
- EvenScheduler to schedule more evenly between supervisor
-
----
- storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj | 7 ++++++-
- 1 file changed, 6 insertions(+), 1 deletion(-)
-
-diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
-index 28b9202..828606d 100644
---- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
-+++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
-@@ -22,7 +22,12 @@
- :implements [backtype.storm.scheduler.IScheduler]))
-
- (defn sort-slots [all-slots]
-- (let [split-up (vals (group-by first all-slots))]
-+ (let [split-up
-+ (map second
-+ (reverse
-+ (sort
-+ (for [[host ports] (group-by first all-slots)]
-+ [(count ports) ports]))))]
- (apply interleave-all split-up)
- ))
-
---
-1.8.5.1
-
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c07a471..b7286dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,17 +18,16 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <!--
<parent>
<groupId>org.apache</groupId>
<artifactId>apache</artifactId>
<version>10</version>
</parent>
- -->
+
<groupId>org.apache.storm</groupId>
<artifactId>storm</artifactId>
- <version>0.9.2-incubating-mt0000</version>
+ <version>0.9.3-incubating-SNAPSHOT</version>
<packaging>pom</packaging>
<name>Storm</name>
<description>Distributed and fault-tolerant realtime computation</description>
@@ -282,16 +281,11 @@
</profiles>
<distributionManagement>
- <repository>
- <id>nexus-releases</id>
- <name>nexus-releases</name>
- <url>http://nexus.sankuai.com:8081/nexus/content/repositories/releases</url>
- </repository>
- <snapshotRepository>
- <id>nexus-snapshots</id>
- <name>nexus-snapshots</name>
- <url>http://nexus.sankuai.com:8081/nexus/content/repositories/snapshots</url>
- </snapshotRepository>
+ <site>
+ <id>storm.maven.website</id>
+ <name>Storm Website</name>
+ <url>file:///tmp/site</url>
+ </site>
</distributionManagement>
<dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-buildtools/maven-shade-clojure-transformer/pom.xml
----------------------------------------------------------------------
diff --git a/storm-buildtools/maven-shade-clojure-transformer/pom.xml b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
index 1c944d5..a6fbad1 100644
--- a/storm-buildtools/maven-shade-clojure-transformer/pom.xml
+++ b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.9.2-incubating-mt0000</version>
+ <version>0.9.3-incubating-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
@@ -36,4 +36,4 @@
<scope>provided</scope>
</dependency>
</dependencies>
-</project>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/dependency-reduced-pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/dependency-reduced-pom.xml b/storm-core/dependency-reduced-pom.xml
deleted file mode 100644
index 9dacd73..0000000
--- a/storm-core/dependency-reduced-pom.xml
+++ /dev/null
@@ -1,359 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>0.9.2-incubating-mt0000</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <name>Storm Core</name>
- <description>Storm Core Java API and Clojure implementation.</description>
- <build>
- <sourceDirectory>src/jvm</sourceDirectory>
- <resources>
- <resource>
- <directory>../conf</directory>
- </resource>
- <resource>
- <targetPath>META-INF</targetPath>
- <directory>../</directory>
- <includes>
- <include>NOTICE</include>
- </includes>
- </resource>
- </resources>
- <testResources>
- <testResource>
- <directory>src/dev</directory>
- </testResource>
- <testResource>
- <directory>test/resources</directory>
- </testResource>
- </testResources>
- <plugins>
- <plugin>
- <groupId>com.theoryinpractise</groupId>
- <artifactId>clojure-maven-plugin</artifactId>
- <extensions>true</extensions>
- <executions>
- <execution>
- <id>compile-clojure</id>
- <phase>compile</phase>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>test-clojure</id>
- <phase>test</phase>
- <goals>
- <goal>test-with-junit</goal>
- </goals>
- <configuration>
- <vmargs>${test.extra.args}</vmargs>
- </configuration>
- </execution>
- </executions>
- <configuration>
- <sourceDirectories>
- <sourceDirectory>src/clj</sourceDirectory>
- </sourceDirectories>
- <testSourceDirectories>
- <testSourceDirectory>test/clj</testSourceDirectory>
- </testSourceDirectories>
- <warnOnReflection>false</warnOnReflection>
- <copyDeclaredNamespaceOnly>true</copyDeclaredNamespaceOnly>
- <copiedNamespaces>
- <copiedNamespace>none</copiedNamespace>
- </copiedNamespaces>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-surefire-report-plugin</artifactId>
- <configuration>
- <reportsDirectories>
- <file>${project.build.directory}/test-reports</file>
- </reportsDirectories>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-shade-plugin</artifactId>
- <version>2.2</version>
- <executions>
- <execution>
- <phase>package</phase>
- <goals>
- <goal>shade</goal>
- </goals>
- </execution>
- </executions>
- <dependencies>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>maven-shade-clojure-transformer</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
- <configuration>
- <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
- <promoteTransitiveDependencies>false</promoteTransitiveDependencies>
- <createDependencyReducedPom>true</createDependencyReducedPom>
- <minimizeJar>false</minimizeJar>
- <artifactSet>
- <includes>
- <include>org.apache.thrift:*</include>
- <include>org.apache.storm:*</include>
- </includes>
- </artifactSet>
- <relocations>
- <relocation>
- <pattern>org.apache.thrift</pattern>
- <shadedPattern>org.apache.thrift7</shadedPattern>
- </relocation>
- </relocations>
- <transformers>
- <transformer />
- </transformers>
- <filters>
- <filter>
- <artifact>org.apache.thrift:*</artifact>
- <excludes>
- <exclude>META-INF/LICENSE.txt</exclude>
- <exclude>META-INF/NOTICE.txt</exclude>
- </excludes>
- </filter>
- </filters>
- </configuration>
- </plugin>
- </plugins>
- </build>
- <dependencies>
- <dependency>
- <groupId>org.clojure</groupId>
- <artifactId>clojure</artifactId>
- <version>1.5.1</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>clj-time</groupId>
- <artifactId>clj-time</artifactId>
- <version>0.4.1</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>compojure</groupId>
- <artifactId>compojure</artifactId>
- <version>1.1.3</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>hiccup</groupId>
- <artifactId>hiccup</artifactId>
- <version>0.3.6</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>ring</groupId>
- <artifactId>ring-devel</artifactId>
- <version>0.3.11</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>ring</groupId>
- <artifactId>ring-jetty-adapter</artifactId>
- <version>0.3.11</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.clojure</groupId>
- <artifactId>tools.logging</artifactId>
- <version>0.2.3</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.clojure</groupId>
- <artifactId>math.numeric-tower</artifactId>
- <version>0.0.1</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.clojure</groupId>
- <artifactId>tools.cli</artifactId>
- <version>0.2.4</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.clojure</groupId>
- <artifactId>tools.nrepl</artifactId>
- <version>0.2.3</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <artifactId>clojure</artifactId>
- <groupId>org.clojure</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>clojure-complete</groupId>
- <artifactId>clojure-complete</artifactId>
- <version>0.2.3</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <artifactId>clojure</artifactId>
- <groupId>org.clojure</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>2.4</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-exec</artifactId>
- <version>1.1</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <version>2.5</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.7.0</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <artifactId>slf4j-api</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-framework</artifactId>
- <version>2.4.0</version>
- <scope>compile</scope>
- <exclusions>
- <exclusion>
- <artifactId>log4j</artifactId>
- <groupId>log4j</groupId>
- </exclusion>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.googlecode.json-simple</groupId>
- <artifactId>json-simple</artifactId>
- <version>1.1</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>carbonite</artifactId>
- <version>1.4.0</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.yaml</groupId>
- <artifactId>snakeyaml</artifactId>
- <version>1.11</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- <version>4.3.3</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>com.googlecode.disruptor</groupId>
- <artifactId>disruptor</artifactId>
- <version>2.10.1</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.jgrapht</groupId>
- <artifactId>jgrapht-core</artifactId>
- <version>0.9.0</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>13.0</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <version>1.0.6</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- <version>1.6.6</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>3.6.3.Final</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <version>1.9.5</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.clojars.runa</groupId>
- <artifactId>conjure</artifactId>
- <version>2.1.3</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>reply</groupId>
- <artifactId>reply</artifactId>
- <version>0.3.0</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <artifactId>cd-client</artifactId>
- <groupId>org.thnetos</groupId>
- </exclusion>
- <exclusion>
- <artifactId>drawbridge</artifactId>
- <groupId>com.cemerick</groupId>
- </exclusion>
- <exclusion>
- <artifactId>versioneer</artifactId>
- <groupId>trptcolin</groupId>
- </exclusion>
- <exclusion>
- <artifactId>sjacket</artifactId>
- <groupId>org.clojars.trptcolin</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-</project>
-
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 91cd370..134eeb8 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -20,11 +20,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
-<<<<<<< HEAD
- <version>0.9.2-incubating-mt0000</version>
-=======
<version>0.9.3-incubating-SNAPSHOT</version>
->>>>>>> upstream/master
</parent>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 64e60be..1bbe53d 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -18,7 +18,6 @@
(:use [backtype.storm bootstrap])
(:import [backtype.storm.hooks ITaskHook])
(:import [backtype.storm.tuple Tuple])
- (:import [backtype.storm.tuple MessageId])
(:import [backtype.storm.spout ISpoutWaitStrategy])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
@@ -390,25 +389,12 @@
(let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
task-ids (:task-ids executor-data)
debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
- component-id (:component-id executor-data)
- executor-id (:executor-id executor-data)
- executor-type (:type executor-data)
]
(disruptor/clojure-handler
(fn [tuple-batch sequence-id end-of-batch?]
(fast-list-iter [[task-id msg] tuple-batch]
- (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))
- tuple-streamid (.getSourceStreamId tuple)
- tuple-source (.getSourceComponent tuple)
- tuple-id (.getMessageId tuple)
- tuple-values (.getValues tuple)
- ]
- (when debug?
- (if (= tuple-streamid "default")
- (log-message "Component[" component-id "] Type[RECV] from Stream[" tuple-streamid "] Source[" tuple-source "] TupleId[" tuple-id "] TupleValue[" tuple-values "]")
- (log-message "Component[" component-id "] Type[RECV] from Stream[" tuple-streamid "] Source[" tuple-source "] TupleId[" tuple-values "]")
- )
- )
+ (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))]
+ (when debug? (log-message "Processing received message " tuple))
(if task-id
(tuple-action-fn task-id tuple)
;; null task ids are broadcast tuples
@@ -435,7 +421,6 @@
last-active (atom false)
spouts (ArrayList. (map :object (vals task-datas)))
rand (Random. (Utils/secureRandomLong))
- debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
pending (RotatingMap.
2 ;; microoptimize for performance of .size method
@@ -443,12 +428,9 @@
(expire [this msg-id [task-id spout-id tuple-info start-time-ms]]
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
(fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta)
- (when debug?
- (log-message "Component[" component-id "] FAILED-TUPLE reason[EXPIRED] TupleId[" msg-id "] values[" tuple-info "]"))
))))
tuple-action-fn (fn [task-id ^TupleImpl tuple]
- (let [stream-id (.getSourceStreamId tuple)
- tuple-id (.getMessageId tuple)]
+ (let [stream-id (.getSourceStreamId tuple)]
(condp = stream-id
Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
@@ -459,18 +441,10 @@
(throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id))
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
(condp = stream-id
- ACKER-ACK-STREAM-ID (do
- (ack-spout-msg executor-data (get task-datas task-id)
- spout-id tuple-finished-info time-delta)
- (when debug?
- (log-message "Component[" component-id "] ACK-TUPLE reason[RECV] TupleId[" id "] values[" tuple-finished-info "]"))
- )
- ACKER-FAIL-STREAM-ID (do
- (fail-spout-msg executor-data (get task-datas task-id)
- spout-id tuple-finished-info time-delta)
- (when debug?
- (log-message "Component[" component-id "] FAILED-TUPLE reason[RECV] TupleId[" id "] values[" tuple-finished-info "]"))
- )
+ ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
+ spout-id tuple-finished-info time-delta)
+ ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
+ spout-id tuple-finished-info time-delta)
)))
;; TODO: on failure, emit tuple to failure stream
))))
@@ -519,8 +493,6 @@
(transfer-fn out-task
out-tuple
overflow-buffer)
- (when debug?
- (log-message "Component[" component-id "] Type[EMIT] to Stream[" out-stream-id "] TupleId[" tuple-id "] values[" values "]"))
))
(if rooted?
(do
@@ -626,8 +598,6 @@
{:keys [storm-conf component-id worker-context transfer-fn report-error sampler
open-or-prepare-was-called?]} executor-data
rand (Random. (Utils/secureRandomLong))
- debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
-
tuple-action-fn (fn [task-id ^TupleImpl tuple]
;; synchronization needs to be done with a key provided by this bolt, otherwise:
;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
@@ -690,13 +660,7 @@
(tasks-fn task stream values)
(tasks-fn stream values))]
(fast-list-iter [t out-tasks]
- (let [anchors-to-ids (HashMap.)
- out-tuple (TupleImpl. worker-context
- values
- task-id
- stream
- (MessageId/makeId anchors-to-ids))
- ]
+ (let [anchors-to-ids (HashMap.)]
(fast-list-iter [^TupleImpl a anchors]
(let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
(when (pos? (count root-ids))
@@ -705,15 +669,12 @@
(fast-list-iter [root-id root-ids]
(put-xor! anchors-to-ids root-id edge-id))
))))
- (transfer-fn t out-tuple)
- (when debug?
- (if (= component-id "__acker")
- (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" (.get values 0) "]")
- (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" (.getMessageId out-tuple) "] values[" values "]")
- )
-
- )
- ))
+ (transfer-fn t
+ (TupleImpl. worker-context
+ values
+ task-id
+ stream
+ (MessageId/makeId anchors-to-ids)))))
(or out-tasks [])))]]
(builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
(if (= component-id Constants/SYSTEM_COMPONENT_ID)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index 53b2802..7566a79 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -473,11 +473,8 @@
topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
(substitute-worker-childopts s port))
logfilename (str "worker-" port ".log")
- worker-childcgroup (when-let [s (conf WORKER-CHILDCGROUP)]
- (.split s " "))
command (concat
- worker-childcgroup
- [(java-cmd) "-server"]
+ [(java-cmd) "-server"]
worker-childopts
topo-worker-childopts
[(str "-Djava.library.path=" jlp)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/clj/backtype/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/task.clj b/storm-core/src/clj/backtype/storm/daemon/task.clj
index 29756a1..3650150 100644
--- a/storm-core/src/clj/backtype/storm/daemon/task.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/task.clj
@@ -131,7 +131,7 @@
(fn ([^Integer out-task-id ^String stream ^List values]
(when debug?
- (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" values "]"))
+ (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
(let [target-component (.getComponentId worker-context out-task-id)
component->grouping (get stream->component->grouper stream)
grouping (get component->grouping target-component)
@@ -149,7 +149,7 @@
))
([^String stream ^List values]
(when debug?
- (log-message "Component[" component-id "] Type[EMIT] to Stream[" stream "] TupleId[" values "]"))
+ (log-message "Emitting: " component-id " " stream " " values))
(let [out-tasks (ArrayList.)]
(fast-map-iter [[out-component grouper] (get stream->component->grouper stream)]
(when (= :direct grouper)
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
index 828606d..28b9202 100644
--- a/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
+++ b/storm-core/src/clj/backtype/storm/scheduler/EvenScheduler.clj
@@ -22,12 +22,7 @@
:implements [backtype.storm.scheduler.IScheduler]))
(defn sort-slots [all-slots]
- (let [split-up
- (map second
- (reverse
- (sort
- (for [[host ports] (group-by first all-slots)]
- [(count ports) ports]))))]
+ (let [split-up (vals (group-by first all-slots))]
(apply interleave-all split-up)
))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 4098038..5f2bcba 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -780,43 +780,43 @@
(let [id (url-decode id)
component (url-decode component)]
(json-response (component-page id component (:window m) (check-include-sys? (:sys m))))))
- ;(POST "/api/v1/topology/:id/activate" [id]
- ; (with-nimbus nimbus
- ; (let [id (url-decode id)
- ; tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- ; name (.get_name tplg)]
- ; (.activate nimbus name)
- ; (log-message "Activating topology '" name "'")))
- ; (resp/redirect (str "/api/v1/topology/" id)))
-
- ;(POST "/api/v1/topology/:id/deactivate" [id]
- ; (with-nimbus nimbus
- ; (let [id (url-decode id)
- ; tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- ; name (.get_name tplg)]
- ; (.deactivate nimbus name)
- ; (log-message "Deactivating topology '" name "'")))
- ; (resp/redirect (str "/api/v1/topology/" id)))
- ;(POST "/api/v1/topology/:id/rebalance/:wait-time" [id wait-time]
- ; (with-nimbus nimbus
- ; (let [id (url-decode id)
- ; tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- ; name (.get_name tplg)
- ; options (RebalanceOptions.)]
- ; (.set_wait_secs options (Integer/parseInt wait-time))
- ; (.rebalance nimbus name options)
- ; (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
- ; (resp/redirect (str "/api/v1/topology/" id)))
- ;(POST "/api/v1/topology/:id/kill/:wait-time" [id wait-time]
- ; (with-nimbus nimbus
- ; (let [id (url-decode id)
- ; tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
- ; name (.get_name tplg)
- ; options (KillOptions.)]
- ; (.set_wait_secs options (Integer/parseInt wait-time))
- ; (.killTopologyWithOpts nimbus name options)
- ; (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
- ; (resp/redirect (str "/api/v1/topology/" id)))
+ (POST "/api/v1/topology/:id/activate" [id]
+ (with-nimbus nimbus
+ (let [id (url-decode id)
+ tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)]
+ (.activate nimbus name)
+ (log-message "Activating topology '" name "'")))
+ (resp/redirect (str "/api/v1/topology/" id)))
+
+ (POST "/api/v1/topology/:id/deactivate" [id]
+ (with-nimbus nimbus
+ (let [id (url-decode id)
+ tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)]
+ (.deactivate nimbus name)
+ (log-message "Deactivating topology '" name "'")))
+ (resp/redirect (str "/api/v1/topology/" id)))
+ (POST "/api/v1/topology/:id/rebalance/:wait-time" [id wait-time]
+ (with-nimbus nimbus
+ (let [id (url-decode id)
+ tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)
+ options (RebalanceOptions.)]
+ (.set_wait_secs options (Integer/parseInt wait-time))
+ (.rebalance nimbus name options)
+ (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
+ (resp/redirect (str "/api/v1/topology/" id)))
+ (POST "/api/v1/topology/:id/kill/:wait-time" [id wait-time]
+ (with-nimbus nimbus
+ (let [id (url-decode id)
+ tplg (.getTopologyInfo ^Nimbus$Client nimbus id)
+ name (.get_name tplg)
+ options (KillOptions.)]
+ (.set_wait_secs options (Integer/parseInt wait-time))
+ (.killTopologyWithOpts nimbus name options)
+ (log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
+ (resp/redirect (str "/api/v1/topology/" id)))
(GET "/" [:as {cookies :cookies}]
(resp/redirect "/index.html"))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 531fa14..ff309a5 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -475,12 +475,6 @@ public class Config extends HashMap<String, Object> {
public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
/**
- * The cgroup opts provided to workers launched by this supervisor.
- */
- public static final String WORKER_CHILDCGROUP = "worker.childcgroup";
- public static final Object WORKER_CHILDCGROUP_SCHEMA = String.class;
-
- /**
* control how many worker receiver threads we need per worker
*/
public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
index a4c8c2c..9965c81 100755
--- a/storm-core/src/multilang/py/storm.py
+++ b/storm-core/src/multilang/py/storm.py
@@ -33,12 +33,10 @@ json_decode = lambda x: json.loads(x)
def readMsg():
msg = ""
while True:
- line = sys.stdin.readline()
- if not line:
- raise Exception('Read EOF from stdin')
- if line[0:-1] == "end":
+ line = sys.stdin.readline()[0:-1]
+ if line == "end":
break
- msg = msg + line
+ msg = msg + line + "\n"
return json_decode(msg[0:-1])
MODE = None
@@ -137,7 +135,7 @@ def reportError(msg):
def log(msg):
sendMsgToParent({"command": "log", "msg": msg})
-
+
def rpcMetrics(name, params):
sendMsgToParent({"command": "metrics", "name": name, "params": params})
@@ -182,13 +180,6 @@ class BasicBolt(object):
def initialize(self, stormconf, context):
pass
- def redirect_stdout_to_stderr(self):
- self.bakup_stdout = sys.stdout
- sys.stdout = sys.stderr
-
- def recover_stdout(self):
- sys.stdout = self.bakup_stdout
-
def process(self, tuple):
pass
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-core/src/ui/public/topology.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/topology.html b/storm-core/src/ui/public/topology.html
index 2ebab42..1020719 100644
--- a/storm-core/src/ui/public/topology.html
+++ b/storm-core/src/ui/public/topology.html
@@ -36,13 +36,11 @@
<h2>Topology summary</h2>
<div id="topology-summary">
</div>
-<!--
<div id="topology-actions">
<h2 class="js-only">Topology actions</h2>
<p id="topology-actions" class="js-only">
</p>
</div>
--->
<div id="topology-stats"></div>
<div id="spout-stats">
</div>
@@ -79,13 +77,13 @@ $(document).ready(function() {
var spoutStats = $("#spout-stats");
var boltStats = $("#bolt-stats");
var config = $("#topology-configuration");
- //var topologyActions = $("#topology-actions");
+ var topologyActions = $("#topology-actions");
var topologyVisualization = $("#topology-visualization")
var formattedConfig = formatConfigData(response["configuration"]);
var buttonJsonData = topologyActionJson(response["id"],response["name"],response["status"],response["msgTimeout"]);
$.get("/templates/topology-page-template.html", function(template) {
topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response));
- //topologyActions.append(Mustache.render($(template).filter("#topology-actions-template").html(),buttonJsonData));
+ topologyActions.append(Mustache.render($(template).filter("#topology-actions-template").html(),buttonJsonData));
topologyStats.append(Mustache.render($(template).filter("#topology-stats-template").html(),response));
$("#topology-stats-table").tablesorter({ sortList: [[0,0]], headers: {0: { sorter: "stormtimestr"}}});
spoutStats.append(Mustache.render($(template).filter("#spout-stats-template").html(),response));
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a4b26af6/storm-dist/binary/pom.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/pom.xml b/storm-dist/binary/pom.xml
index 464ecd7..0d97c0b 100644
--- a/storm-dist/binary/pom.xml
+++ b/storm-dist/binary/pom.xml
@@ -21,11 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
-<<<<<<< HEAD
- <version>0.9.2-incubating-mt0000</version>
-=======
<version>0.9.3-incubating-SNAPSHOT</version>
->>>>>>> upstream/master
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.storm</groupId>