You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/12/07 03:28:08 UTC

[08/18] samza-hello-samza git commit: Updated with changes in apache/samza master for Samza 1.0

Updated with changes in apache/samza master for Samza 1.0

Author: Prateek Maheshwari <pm...@apache.org>

Closes #36 from prateekm/latest


Project: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/commit/2d956496
Tree: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/tree/2d956496
Diff: http://git-wip-us.apache.org/repos/asf/samza-hello-samza/diff/2d956496

Branch: refs/heads/master
Commit: 2d956496fa68c514d087399167d0f07672bcac45
Parents: 195e181
Author: Prateek Maheshwari <pm...@apache.org>
Authored: Sat Oct 13 15:43:27 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Sat Oct 13 15:43:27 2018 -0700

----------------------------------------------------------------------
 bin/deploy.sh                                   |  2 +-
 gradle.properties                               |  2 +-
 pom.xml                                         |  4 +-
 .../samza/examples/azure/AzureApplication.java  | 19 ++-----
 .../samza/examples/cookbook/FilterExample.java  |  8 +--
 .../samza/examples/cookbook/JoinExample.java    |  8 +--
 .../cookbook/RemoteTableJoinExample.java        | 15 +++---
 .../examples/cookbook/SessionWindowExample.java |  8 +--
 .../cookbook/StreamTableJoinExample.java        | 10 ++--
 .../cookbook/TumblingWindowExample.java         |  8 +--
 .../application/WikipediaApplication.java       | 24 ++++-----
 .../system/WikipediaInputDescriptor.java        | 41 ---------------
 .../system/WikipediaSystemDescriptor.java       | 51 -------------------
 .../descriptors/WikipediaInputDescriptor.java   | 42 ++++++++++++++++
 .../descriptors/WikipediaSystemDescriptor.java  | 53 ++++++++++++++++++++
 .../task/WikipediaStatsStreamTask.java          | 11 ++--
 16 files changed, 149 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/bin/deploy.sh
----------------------------------------------------------------------
diff --git a/bin/deploy.sh b/bin/deploy.sh
index 5b50079..3c3ada2 100755
--- a/bin/deploy.sh
+++ b/bin/deploy.sh
@@ -23,4 +23,4 @@ base_dir=`pwd`
 
 mvn clean package
 mkdir -p $base_dir/deploy/samza
-tar -xvf $base_dir/target/hello-samza-0.15.0-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza
+tar -xvf $base_dir/target/hello-samza-1.0.0-SNAPSHOT-dist.tar.gz -C $base_dir/deploy/samza

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/gradle.properties
----------------------------------------------------------------------
diff --git a/gradle.properties b/gradle.properties
index 37f8eb7..34a540a 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-SAMZA_VERSION=0.15.0-SNAPSHOT
+SAMZA_VERSION=1.0.0-SNAPSHOT
 KAFKA_VERSION=0.11.0.2
 HADOOP_VERSION=2.6.1
 

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a41be49..8659683 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@ under the License.
 
   <groupId>org.apache.samza</groupId>
   <artifactId>hello-samza</artifactId>
-  <version>0.15.0-SNAPSHOT</version>
+  <version>1.0.0-SNAPSHOT</version>
   <packaging>jar</packaging>
   <name>Samza Example</name>
   <description>
@@ -153,7 +153,7 @@ under the License.
   <properties>
     <!-- maven specific properties -->
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    <samza.version>0.15.0-SNAPSHOT</samza.version>
+    <samza.version>1.0.0-SNAPSHOT</samza.version>
     <hadoop.version>2.6.1</hadoop.version>
   </properties>
 

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/azure/AzureApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/azure/AzureApplication.java b/src/main/java/samza/examples/azure/AzureApplication.java
index 12d293b..e2c337f 100644
--- a/src/main/java/samza/examples/azure/AzureApplication.java
+++ b/src/main/java/samza/examples/azure/AzureApplication.java
@@ -19,31 +19,24 @@
 
 package samza.examples.azure;
 
-import java.util.HashMap;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.descriptors.GenericInputDescriptor;
-import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
-import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
 import org.apache.samza.serializers.ByteSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.system.descriptors.GenericOutputDescriptor;
+import org.apache.samza.system.descriptors.GenericSystemDescriptor;
 
 public class AzureApplication implements StreamApplication {
-
-  // Inputs
   private static final String INPUT_STREAM_ID = "input-stream";
-
-  // Outputs
   private static final String OUTPUT_STREAM_ID = "output-stream";
 
   @Override
   public void describe(StreamApplicationDescriptor appDescriptor) {
-    HashMap<String, String> systemConfigs = new HashMap<>();
-
     GenericSystemDescriptor systemDescriptor =
         new GenericSystemDescriptor("eventhubs", "org.apache.samza.system.eventhub.EventHubSystemFactory");
 
@@ -55,13 +48,9 @@ public class AzureApplication implements StreamApplication {
     GenericOutputDescriptor<KV<String, byte[]>> outputDescriptor =
         systemDescriptor.getOutputDescriptor(OUTPUT_STREAM_ID, serde);
 
-
-    // Input
     MessageStream<KV<String, byte[]>> eventhubInput = appDescriptor.getInputStream(inputDescriptor);
-    // Output
     OutputStream<KV<String, byte[]>> eventhubOutput = appDescriptor.getOutputStream(outputDescriptor);
 
-    // Send
     eventhubInput
         .filter((message) -> message.getKey() != null)
         .map((message) -> {

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/FilterExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/FilterExample.java b/src/main/java/samza/examples/cookbook/FilterExample.java
index bcf5b18..bd14300 100644
--- a/src/main/java/samza/examples/cookbook/FilterExample.java
+++ b/src/main/java/samza/examples/cookbook/FilterExample.java
@@ -19,16 +19,16 @@
 package samza.examples.cookbook;
 
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/JoinExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/JoinExample.java b/src/main/java/samza/examples/cookbook/JoinExample.java
index 05a358d..14753eb 100644
--- a/src/main/java/samza/examples/cookbook/JoinExample.java
+++ b/src/main/java/samza/examples/cookbook/JoinExample.java
@@ -20,7 +20,7 @@ package samza.examples.cookbook;
 
 import java.io.Serializable;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
@@ -28,9 +28,9 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java b/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java
index 386cdda..1c27cda 100644
--- a/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java
+++ b/src/main/java/samza/examples/cookbook/RemoteTableJoinExample.java
@@ -29,20 +29,20 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.table.Table;
-import org.apache.samza.table.caching.CachingTableDescriptor;
-import org.apache.samza.table.remote.RemoteTableDescriptor;
+import org.apache.samza.table.caching.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.remote.TableReadFunction;
+import org.apache.samza.table.remote.descriptors.RemoteTableDescriptor;
 import org.apache.samza.util.ExponentialSleepStrategy;
 import org.apache.samza.util.HttpUtil;
 import org.codehaus.jackson.JsonFactory;
@@ -131,8 +131,7 @@ public class RemoteTableJoinExample implements StreamApplication {
             .withReadRateLimit(10)
             .withReadFunction(new StockPriceReadFunction());
     CachingTableDescriptor<String, Double> cachedRemoteTableDescriptor =
-        new CachingTableDescriptor<String, Double>("cached-remote-table")
-            .withTable(remoteTableDescriptor)
+        new CachingTableDescriptor<>("cached-remote-table", remoteTableDescriptor)
             .withReadTtl(Duration.ofSeconds(5));
     Table<KV<String, Double>> cachedRemoteTable = appDescriptor.getTable(cachedRemoteTableDescriptor);
 

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/SessionWindowExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/SessionWindowExample.java b/src/main/java/samza/examples/cookbook/SessionWindowExample.java
index bfdf188..1db0808 100644
--- a/src/main/java/samza/examples/cookbook/SessionWindowExample.java
+++ b/src/main/java/samza/examples/cookbook/SessionWindowExample.java
@@ -20,7 +20,7 @@ package samza.examples.cookbook;
 
 import java.io.Serializable;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
@@ -29,9 +29,9 @@ import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java b/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java
index 4d5c57e..d9f6acf 100644
--- a/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java
+++ b/src/main/java/samza/examples/cookbook/StreamTableJoinExample.java
@@ -19,7 +19,7 @@
 package samza.examples.cookbook;
 
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
@@ -28,10 +28,10 @@ import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.storage.kv.RocksDbTableDescriptor;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.storage.kv.descriptors.RocksDbTableDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.apache.samza.table.Table;
 
 import com.google.common.collect.ImmutableList;

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/cookbook/TumblingWindowExample.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/cookbook/TumblingWindowExample.java b/src/main/java/samza/examples/cookbook/TumblingWindowExample.java
index 51c2056..5ec6876 100644
--- a/src/main/java/samza/examples/cookbook/TumblingWindowExample.java
+++ b/src/main/java/samza/examples/cookbook/TumblingWindowExample.java
@@ -20,7 +20,7 @@ package samza.examples.cookbook;
 
 import java.io.Serializable;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
@@ -29,9 +29,9 @@ import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.kafka.KafkaInputDescriptor;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaInputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
index 734df96..60bbe15 100644
--- a/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
+++ b/src/main/java/samza/examples/wikipedia/application/WikipediaApplication.java
@@ -22,29 +22,28 @@ package samza.examples.wikipedia.application;
 import com.google.common.collect.ImmutableList;
 import java.io.Serializable;
 import org.apache.samza.application.StreamApplication;
-import org.apache.samza.application.StreamApplicationDescriptor;
-import org.apache.samza.config.Config;
+import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.TaskContext;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OutputStream;
 import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.functions.SupplierFunction;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.kafka.KafkaOutputDescriptor;
-import org.apache.samza.system.kafka.KafkaSystemDescriptor;
-import org.apache.samza.task.TaskContext;
+import org.apache.samza.system.kafka.descriptors.KafkaOutputDescriptor;
+import org.apache.samza.system.kafka.descriptors.KafkaSystemDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableMap;
 import samza.examples.wikipedia.model.WikipediaParser;
 import samza.examples.wikipedia.system.WikipediaFeed.WikipediaFeedEvent;
-import samza.examples.wikipedia.system.WikipediaInputDescriptor;
-import samza.examples.wikipedia.system.WikipediaSystemDescriptor;
+import samza.examples.wikipedia.system.descriptors.WikipediaInputDescriptor;
+import samza.examples.wikipedia.system.descriptors.WikipediaSystemDescriptor;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -141,13 +140,14 @@ public class WikipediaApplication implements StreamApplication, Serializable {
 
     /**
      * {@inheritDoc}
-     * Override {@link org.apache.samza.operators.functions.InitableFunction#init(Config, TaskContext)} to
+     * Override {@link org.apache.samza.operators.functions.InitableFunction#init(Context)} to
      * get a KeyValueStore for persistence and the MetricsRegistry for metrics.
      */
     @Override
-    public void init(Config config, TaskContext context) {
-      store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats");
-      repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits");
+    public void init(Context context) {
+      TaskContext taskContext = context.getTaskContext();
+      store = (KeyValueStore<String, Integer>) taskContext.getStore("wikipedia-stats");
+      repeatEdits = taskContext.getTaskMetricsRegistry().newCounter("edit-counters", "repeat-edits");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java b/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java
deleted file mode 100644
index 92de60d..0000000
--- a/src/main/java/samza/examples/wikipedia/system/WikipediaInputDescriptor.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.system;
-
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.serializers.Serde;
-import samza.examples.wikipedia.application.WikipediaApplication;
-
-
-public class WikipediaInputDescriptor extends InputDescriptor<WikipediaFeed.WikipediaFeedEvent, WikipediaInputDescriptor> {
-  // Messages come from WikipediaConsumer so we know that they don't have a key and don't need to be deserialized.
-  private static final Serde SERDE = new NoOpSerde();
-
-  WikipediaInputDescriptor(String streamId, SystemDescriptor systemDescriptor) {
-    super(streamId, SERDE, systemDescriptor, null);
-  }
-
-  public WikipediaInputDescriptor withChannel(String channel) {
-    withPhysicalName(channel);
-    return this;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java b/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java
deleted file mode 100644
index 6f50196..0000000
--- a/src/main/java/samza/examples/wikipedia/system/WikipediaSystemDescriptor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package samza.examples.wikipedia.system;
-
-import java.util.Map;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-
-public class WikipediaSystemDescriptor extends SystemDescriptor<WikipediaSystemDescriptor> {
-  private static final String SYSTEM_NAME = "wikipedia";
-  private static final String FACTORY_CLASS_NAME = WikipediaSystemFactory.class.getName();
-  private static final String HOST_KEY = "systems.%s.host";
-  private static final String PORT_KEY = "systems.%s.port";
-
-  private final String host;
-  private final int port;
-
-  public WikipediaSystemDescriptor(String host, int port) {
-    super(SYSTEM_NAME, FACTORY_CLASS_NAME, null, null);
-    this.host = host;
-    this.port = port;
-  }
-
-  public WikipediaInputDescriptor getInputDescriptor(String streamId) {
-    return new WikipediaInputDescriptor(streamId, this);
-  }
-
-  @Override
-  public Map<String, String> toConfig() {
-    Map<String, String> configs = super.toConfig();
-    configs.put(String.format(HOST_KEY, getSystemName()), host);
-    configs.put(String.format(PORT_KEY, getSystemName()), Integer.toString(port));
-    return configs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaInputDescriptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaInputDescriptor.java b/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaInputDescriptor.java
new file mode 100644
index 0000000..29c7b92
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaInputDescriptor.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.system.descriptors;
+
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+
+import samza.examples.wikipedia.system.WikipediaFeed;
+
+
+public class WikipediaInputDescriptor extends InputDescriptor<WikipediaFeed.WikipediaFeedEvent, WikipediaInputDescriptor> {
+  // Messages come from WikipediaConsumer so we know that they don't have a key and don't need to be deserialized.
+  private static final Serde SERDE = new NoOpSerde();
+
+  WikipediaInputDescriptor(String streamId, SystemDescriptor systemDescriptor) {
+    super(streamId, SERDE, systemDescriptor, null);
+  }
+
+  public WikipediaInputDescriptor withChannel(String channel) {
+    withPhysicalName(channel);
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaSystemDescriptor.java b/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaSystemDescriptor.java
new file mode 100644
index 0000000..9f516fd
--- /dev/null
+++ b/src/main/java/samza/examples/wikipedia/system/descriptors/WikipediaSystemDescriptor.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samza.examples.wikipedia.system.descriptors;
+
+import samza.examples.wikipedia.system.WikipediaSystemFactory;
+
+import java.util.Map;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+
+public class WikipediaSystemDescriptor extends SystemDescriptor<WikipediaSystemDescriptor> {
+  private static final String SYSTEM_NAME = "wikipedia";
+  private static final String FACTORY_CLASS_NAME = WikipediaSystemFactory.class.getName();
+  private static final String HOST_KEY = "systems.%s.host";
+  private static final String PORT_KEY = "systems.%s.port";
+
+  private final String host;
+  private final int port;
+
+  public WikipediaSystemDescriptor(String host, int port) {
+    super(SYSTEM_NAME, FACTORY_CLASS_NAME, null, null);
+    this.host = host;
+    this.port = port;
+  }
+
+  public WikipediaInputDescriptor getInputDescriptor(String streamId) {
+    return new WikipediaInputDescriptor(streamId, this);
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    Map<String, String> configs = super.toConfig();
+    configs.put(String.format(HOST_KEY, getSystemName()), host);
+    configs.put(String.format(PORT_KEY, getSystemName()), Integer.toString(port));
+    return configs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza-hello-samza/blob/2d956496/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
----------------------------------------------------------------------
diff --git a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
index abe760a..897f9f1 100644
--- a/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
+++ b/src/main/java/samza/examples/wikipedia/task/WikipediaStatsStreamTask.java
@@ -23,7 +23,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-import org.apache.samza.config.Config;
+import org.apache.samza.context.Context;
+import org.apache.samza.context.TaskContext;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -32,7 +33,6 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
-import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.WindowableTask;
 
@@ -48,9 +48,10 @@ public class WikipediaStatsStreamTask implements StreamTask, InitableTask, Windo
   // Example metric. Running counter of the number of repeat edits of the same title within a single window.
   private Counter repeatEdits;
 
-  public void init(Config config, TaskContext context) {
-    this.store = (KeyValueStore<String, Integer>) context.getStore("wikipedia-stats");
-    this.repeatEdits = context.getMetricsRegistry().newCounter("edit-counters", "repeat-edits");
+  public void init(Context context) {
+    TaskContext taskContext = context.getTaskContext();
+    this.store = (KeyValueStore<String, Integer>) taskContext.getStore("wikipedia-stats");
+    this.repeatEdits = taskContext.getTaskMetricsRegistry().newCounter("edit-counters", "repeat-edits");
   }
 
   @SuppressWarnings("unchecked")