You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/12/07 03:28:08 UTC
[08/18] samza-hello-samza git commit: Updated with changes in
apache/samza master for Samza 1.0
Updated with changes in apache/samza master for Samza 1.0
Author: Prateek Maheshwari <pm...@apache.org>
Closes #36 from prateekm/latest
Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/2d956496
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/2d956496
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/2d956496
Branch: refs/heads/master
Commit: 2d956496fa68c514d087399167d0f07672bcac45
Parents: 195e181
Author: Prateek Maheshwari <pm...@apache.org>
Authored: Sat Oct 13 15:43:27 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Sat Oct 13 15:43:27 2018 -0700
----------------------------------------------------------------------
bin/deploy.sh | 2 +-
gradle.properties | 2 +-
pom.xml | 4 +-
.../samza/examples/azure/AzureApplication.java | 19 ++-----
.../samza/examples/cookbook/FilterExample.java | 8 +--
.../samza/examples/cookbook/JoinExample.java | 8 +--
.../cookbook/RemoteTableJoinExample.java | 15 +++---
.../examples/cookbook/SessionWindowExample.java | 8 +--
.../cookbook/StreamTableJoinExample.java | 10 ++--
.../cookbook/TumblingWindowExample.java | 8 +--
.../application/WikipediaApplication.java | 24 ++++-----
.../system/WikipediaInputDescriptor.java | 41 ---------------
.../system/WikipediaSystemDescriptor.java | 51 -------------------
.../descriptors/WikipediaInputDescriptor.java | 42 ++++++++++++++++
.../descriptors/WikipediaSystemDescriptor.java | 53 ++++++++++++++++++++
.../task/WikipediaStatsStreamTask.java | 11 ++--
16 files changed, 149 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/bin/deploy.sh
----------------------------------------------------------------------
diff --git a/bin/deploy.sh b/bin/deploy.sh
index 5b50079..3c3ada2 100755
--- a/bin/deploy.sh
+++ b/bin/deploy.sh
@@ -23,4 +23,4 @@ base_dir=`pwd`
mvn clean package
mkdir -p $base_dir/deploy/samza
-tar -xvf $base_dir/target/hello-samza-0.15.0-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza
+tar -xvf $base_dir/target/hello-samza-1.0.0-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 37f8eb7..34a540a 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,7 +17,7 @@
* under the License.
*/
-SAMZA_VERSION=0.15.0-SNAPSHOT
+SAMZA_VERSION=1.0.0-SNAPSHOT
KAFKA_VERSION=0.11.0.2
HADOOP_VERSION=2.6.1
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a41be49..8659683 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
<groupId>org.apache.samza</groupId>
<artifactId>hello-samza</artifactId>
- <version>0.15.0-SNAPSHOT</version>
+ <version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Samza Example</name>
<description>
@@ -153,7 +153,7 @@ under the License.
<properties>
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <samza.version>0.15.0-SNAPSHOT</samza.version>
+ <samza.version>1.0.0-SNAPSHOT</samza.version>
<hadoop.version>2.6.1</hadoop.version>
</properties>
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/azure/AzureApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/azure/AzureApplication.java b/src/main/java/samza/examples/azure/AzureApplication.java
index 12d293b..e2c337f 100644
--- a/src/main/java/samza/examples/azure/AzureApplication.java
+++ b/src/main/java/samza/examples/azure/AzureApplication.java
@@ -19,31 +19,24 @@
package samza.examples.azure;
-import java.util.HashMap;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
-import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
import org.apache.samza.serializers.ByteSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.GenericOutputDescriptor;
+import org.apache.samza.system.descriptors.GenericSystemDescriptor;
public class AzureApplication implements StreamApplication {
-
- // Inputs
private static final String INPUT_STREAM_ID = "input-stream";
-
- // Outputs
private static final String OUTPUT_STREAM_ID = "output-stream";
@Override
public void describe(StreamApplicationDescriptor appDescriptor) {
- HashMap<String, String> systemConfigs = new HashMap<>();
-
GenericSystemDescriptor systemDescriptor =
new GenericSystemDescriptor("eventhubs", "org.apache.samza.system.eventhub.EventHubSystemFactory");
@@ -55,13 +48,9 @@ public class AzureApplication implements StreamApplication {
GenericOutputDescriptor<KV<String, byte[]>> outputDescriptor =
systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, serde);
-
- // Input
MessageStream<KV<String, byte[]>> eventhubInput = appDescriptor.getInputStream(inputDescriptor);
- // Output
OutputStream<KV<String, byte[]>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor);
- // Send
eventhubInput
.filter((message) -> message.getKey() != null)
.map((message) -> {
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/FilterExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/FilterExample.java b/src/main/java/samza/examples/cookbook/FilterExample.java
index bcf5b18..bd14300 100644
--- a/src/main/java/samza/examples/cookbook/FilterExample.java
+++ b/src/main/java/samza/examples/cookbook/FilterExample.java
@@ -19,16 +19,16 @@
package samza.examples.cookbook;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/JoinExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/JoinExample.java b/src/main/java/samza/examples/cookbook/JoinExample.java
index 05a358d..14753eb 100644
--- a/src/main/java/samza/examples/cookbook/JoinExample.java
+++ b/src/main/java/samza/examples/cookbook/JoinExample.java
@@ -20,7 +20,7 @@ package samza.examples.cookbook;
import java.io.Serializable;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
@@ -28,9 +28,9 @@ import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java b/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java
index 386cdda..1c27cda 100644
--- a/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java
+++ b/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java
@@ -29,20 +29,20 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.functions.StreamTableJoinFunction;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.table.Table;
-import org.apache.samza.table.caching.CachingTableDescriptor;
-import org.apache.samza.table.remote.RemoteTableDescriptor;
+import org.apache.samza.table.caching.descriptors.CachingTableDescriptor;
import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.descriptors.RemoteTableDescriptor;
import org.apache.samza.util.ExponentialSleepStrategy;
import org.apache.samza.util.HttpUtil;
import org.codehaus.jackson.JsonFactory;
@@ -131,8 +131,7 @@ public class RemoteTableJoinExample implements StreamApplication {
.withReadRateLimit(10)
.withReadFunction(new StockPriceReadFunction());
CachingTableDescriptor<String, Double> cachedRemoteTableDescriptor =
- new CachingTableDescriptor<String, Double>("cached-remote-table")
- .withTable(remoteTableDescriptor)
+ new CachingTableDescriptor<>("cached-remote-table", remoteTableDescriptor)
.withReadTtl(Duration.ofSeconds(5));
Table<KV<String, Double>> cachedRemoteTable = appDescriptor.getTable(cachedRemoteTableDescriptor);
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/SessionWindowExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/SessionWindowExample.java b/src/main/java/samza/examples/cookbook/SessionWindowExample.java
index bfdf188..1db0808 100644
--- a/src/main/java/samza/examples/cookbook/SessionWindowExample.java
+++ b/src/main/java/samza/examples/cookbook/SessionWindowExample.java
@@ -20,7 +20,7 @@ package samza.examples.cookbook;
import java.io.Serializable;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
@@ -29,9 +29,9 @@ import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java b/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java
index 4d5c57e..d9f6acf 100644
--- a/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java
+++ b/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java
@@ -19,7 +19,7 @@
package samza.examples.cookbook;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
@@ -28,10 +28,10 @@ import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.storage.kv.RocksDbTableDescriptor;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.apache.samza.table.Table;
import com.google.common.collect.ImmutableList;
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/TumblingWindowExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/TumblingWindowExample.java b/src/main/java/samza/examples/cookbook/TumblingWindowExample.java
index 51c2056..5ec6876 100644
--- a/src/main/java/samza/examples/cookbook/TumblingWindowExample.java
+++ b/src/main/java/samza/examples/cookbook/TumblingWindowExample.java
@@ -20,7 +20,7 @@ package samza.examples.cookbook;
import java.io.Serializable;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
@@ -29,9 +29,9 @@ import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
index 734df96..60bbe15 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -22,29 +22,28 @@ package samza.examples.wikipedia.application;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.TaskContext;
import org.apache.samza.metrics.Counter;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.functions.SupplierFunction;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
import samza.examples.wikipedia.model.WikipediaParser;
import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
-import samza.examples.wikipedia.system.WikipediaInputDescriptor;
-import samza.examples.wikipedia.system.WikipediaSystemDescriptor;
+import samza.examples.wikipedia.system.descriptors.WikipediaInputDescriptor;
+import samza.examples.wikipedia.system.descriptors.WikipediaSystemDescriptor;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -141,13 +140,14 @@ public class WikipediaApplication implements StreamApplication, Serializable {
/**
* {@inheritDoc}
- * Override {@link org.apache.samza.operators.functions.InitableFunction#init(Config, TaskContext)} to
+ * Override {@link org.apache.samza.operators.functions.InitableFunction#init(Context)} to
* get a KeyValueStore for persistence and the MetricsRegistry for metrics.
*/
@Override
- public void init(Config config, TaskContext context) {
- store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats");
- repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits");
+ public void init(Context context) {
+ TaskContext taskContext = context.getTaskContext();
+ store = (KeyValueStore<String, Integer>) taskContext.getStore("wikipedia-stats");
+ repeatEdits = taskContext.getTaskMetricsRegistry().newCounter("edit-counters", "repeat-edits");
}
@Override
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java b/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java
deleted file mode 100644
index 92de60d..0000000
--- a/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.system;
-
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.serializers.Serde;
-import samza.examples.wikipedia.application.WikipediaApplication;
-
-
-public class WikipediaInputDescriptor extends InputDescriptor<WikipediaFeed.WikipediaFeedEvent, WikipediaInputDescriptor> {
- // Messages come from WikipediaConsumer so we know that they don't have a key and don't need to be deserialized.
- private static final Serde SERDE = new NoOpSerde();
-
- WikipediaInputDescriptor(String streamId, SystemDescriptor systemDescriptor) {
- super(streamId, SERDE, systemDescriptor, null);
- }
-
- public WikipediaInputDescriptor withChannel(String channel) {
- withPhysicalName(channel);
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java b/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java
deleted file mode 100644
index 6f50196..0000000
--- a/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.system;
-
-import java.util.Map;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-
-public class WikipediaSystemDescriptor extends SystemDescriptor<WikipediaSystemDescriptor> {
- private static final String SYSTEM_NAME = "wikipedia";
- private static final String FACTORY_CLASS_NAME = WikipediaSystemFactory.class.getName();
- private static final String HOST_KEY = "systems.%s.host";
- private static final String PORT_KEY = "systems.%s.port";
-
- private final String host;
- private final int port;
-
- public WikipediaSystemDescriptor(String host, int port) {
- super(SYSTEM_NAME, FACTORY_CLASS_NAME, null, null);
- this.host = host;
- this.port = port;
- }
-
- public WikipediaInputDescriptor getInputDescriptor(String streamId) {
- return new WikipediaInputDescriptor(streamId, this);
- }
-
- @Override
- public Map<String, String> toConfig() {
- Map<String, String> configs = super.toConfig();
- configs.put(String.format(HOST_KEY, getSystemName()), host);
- configs.put(String.format(PORT_KEY, getSystemName()), Integer.toString(port));
- return configs;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaInputDescriptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaInputDescriptor.java b/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaInputDescriptor.java
new file mode 100644
index 0000000..29c7b92
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaInputDescriptor.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.system.descriptors;
+
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+
+import samza.examples.wikipedia.system.WikipediaFeed;
+
+
+public class WikipediaInputDescriptor extends InputDescriptor<WikipediaFeed.WikipediaFeedEvent, WikipediaInputDescriptor> {
+ // Messages come from WikipediaConsumer so we know that they don't have a key and don't need to be deserialized.
+ private static final Serde SERDE = new NoOpSerde();
+
+ WikipediaInputDescriptor(String streamId, SystemDescriptor systemDescriptor) {
+ super(streamId, SERDE, systemDescriptor, null);
+ }
+
+ public WikipediaInputDescriptor withChannel(String channel) {
+ withPhysicalName(channel);
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaSystemDescriptor.java b/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaSystemDescriptor.java
new file mode 100644
index 0000000..9f516fd
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaSystemDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.system.descriptors;
+
+import samza.examples.wikipedia.system.WikipediaSystemFactory;
+
+import java.util.Map;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+
+public class WikipediaSystemDescriptor extends SystemDescriptor<WikipediaSystemDescriptor> {
+ private static final String SYSTEM_NAME = "wikipedia";
+ private static final String FACTORY_CLASS_NAME = WikipediaSystemFactory.class.getName();
+ private static final String HOST_KEY = "systems.%s.host";
+ private static final String PORT_KEY = "systems.%s.port";
+
+ private final String host;
+ private final int port;
+
+ public WikipediaSystemDescriptor(String host, int port) {
+ super(SYSTEM_NAME, FACTORY_CLASS_NAME, null, null);
+ this.host = host;
+ this.port = port;
+ }
+
+ public WikipediaInputDescriptor getInputDescriptor(String streamId) {
+ return new WikipediaInputDescriptor(streamId, this);
+ }
+
+ @Override
+ public Map<String, String> toConfig() {
+ Map<String, String> configs = super.toConfig();
+ configs.put(String.format(HOST_KEY, getSystemName()), host);
+ configs.put(String.format(PORT_KEY, getSystemName()), Integer.toString(port));
+ return configs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
index abe760a..897f9f1 100644
--- a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
@@ -23,7 +23,8 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.TaskContext;
import org.apache.samza.metrics.Counter;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
@@ -32,7 +33,6 @@ import org.apache.samza.system.SystemStream;
import org.apache.samza.task.InitableTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.task.WindowableTask;
@@ -48,9 +48,10 @@ public class WikipediaStatsStreamTask implements StreamTask, InitableTask, Windo
// Example metric. Running counter of the number of repeat edits of the same title within a single window.
private Counter repeatEdits;
- public void init(Config config, TaskContext context) {
- this.store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats");
- this.repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits");
+ public void init(Context context) {
+ TaskContext taskContext = context.getTaskContext();
+ this.store = (KeyValueStore<String, Integer>) taskContext.getStore("wikipedia-stats");
+ this.repeatEdits = taskContext.getTaskMetricsRegistry().newCounter("edit-counters", "repeat-edits");
}
@SuppressWarnings("unchecked")