You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/13 01:34:49 UTC

[09/12] samza git commit: Consolidating package names for System, Stream, Application and Table descriptors.

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsOutputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsOutputDescriptor.java
deleted file mode 100644
index e59b4d0..0000000
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsOutputDescriptor.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system.eventhub;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-
-/**
- * A descriptor for an Event Hubs output stream
- * <p>
- *   An instance of this descriptor may be obtained from and {@link EventHubsSystemDescriptor}
- * </p>
- * Stream properties configured using a descriptor overrides corresponding properties and property defaults provided
- * in configuration.
- *
- * @param <StreamMessageType> type of messages in this stream
- */
-public class EventHubsOutputDescriptor<StreamMessageType>
-    extends OutputDescriptor<StreamMessageType, EventHubsOutputDescriptor<StreamMessageType>> {
-  private String namespace;
-  private String entityPath;
-  private Optional<String> sasKeyName = Optional.empty();
-  private Optional<String> sasToken = Optional.empty();
-
-  /**
-   * Constructs an {@link OutputDescriptor} instance.
-   *
-   * @param streamId id of the stream
-   * @param namespace namespace for the Event Hubs entity to produce to, not null
-   * @param entityPath entity path for the Event Hubs entity to produce to, not null
-   * @param serde serde for messages in the stream
-   * @param systemDescriptor system descriptor this stream descriptor was obtained from
-   */
-  EventHubsOutputDescriptor(String streamId, String namespace, String entityPath, Serde serde,
-      SystemDescriptor systemDescriptor) {
-    super(streamId, serde, systemDescriptor);
-    this.namespace = StringUtils.stripToNull(namespace);
-    this.entityPath = StringUtils.stripToNull(entityPath);
-    if (this.namespace == null || this.entityPath == null) {
-      throw new ConfigException(String.format("Missing namespace and entity path Event Hubs output descriptor in " //
-          + "system: {%s}, stream: {%s}", getSystemName(), streamId));
-    }
-  }
-
-  /**
-   * SAS Key name of the associated output stream. Required to access the output Event Hubs entity per stream.
-   *
-   * @param sasKeyName the name of the SAS key required to access the Event Hubs entity
-   * @return this output descriptor
-   */
-  public EventHubsOutputDescriptor<StreamMessageType> withSasKeyName(String sasKeyName) {
-    this.sasKeyName = Optional.of(StringUtils.stripToNull(sasKeyName));
-    return this;
-  }
-
-  /**
-   * SAS Token of the associated output stream. Required to access the output Event Hubs per stream.
-   *
-   * @param sasToken the SAS token required to access to Event Hubs entity
-   * @return this output descriptor
-   */
-  public EventHubsOutputDescriptor<StreamMessageType> withSasKey(String sasToken) {
-    this.sasToken = Optional.of(StringUtils.stripToNull(sasToken));
-    return this;
-  }
-
-  @Override
-  public Map<String, String> toConfig() {
-    HashMap<String, String> ehConfigs = new HashMap<>(super.toConfig());
-
-    String streamId = getStreamId();
-
-    ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId), namespace);
-    ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId), entityPath);
-    sasKeyName.ifPresent(keyName ->
-        ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId), keyName));
-    sasToken.ifPresent(key ->
-        ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId), key));
-    return ehConfigs;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsSystemDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsSystemDescriptor.java
deleted file mode 100644
index 189340f..0000000
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubsSystemDescriptor.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system.eventhub;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
-
-
-/**
- * A descriptor for a Event Hubs system.
- * <p>
- * System properties configured using a descriptor override corresponding properties provided in configuration.
- */
-public class EventHubsSystemDescriptor extends SystemDescriptor<EventHubsSystemDescriptor> {
-  private static final String FACTORY_CLASS_NAME = EventHubSystemFactory.class.getName();
-
-  private List<String> streamIds = new ArrayList<>();
-  private Optional<Integer> fetchRuntimeInfoTimeout = Optional.empty();
-  private Optional<Integer> numClientThreads = Optional.empty();
-  private Optional<Integer> consumerReceiveQueueSize = Optional.empty();
-  private Optional<Integer> consumerMaxEventCountPerPoll = Optional.empty();
-  private Optional<Integer> consumerPrefetchCount = Optional.empty();
-  private Optional<Boolean> producerEventhubsSendKey = Optional.empty();
-  private Optional<PartitioningMethod> producerEventhubsPartitioningMethod = Optional.empty();
-
-  /**
-   * Constructs a {@link SystemDescriptor} instance.
-   *  @param systemName name of this system
-   */
-  public EventHubsSystemDescriptor(String systemName) {
-    super(systemName, FACTORY_CLASS_NAME, null, null);
-  }
-
-  /**
-   * Gets an {@link EventHubsInputDescriptor} for the input stream of this system. The stream has the provided
-   * namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
-   * <p>
-   * The type of messages in the stream is the type of the provided stream level serde.
-   *
-   * @param streamId id of the input stream
-   * @param namespace namespace of the Event Hubs entity to consume from
-   * @param entityPath entity path of the Event Hubs entity to consume from
-   * @param serde stream level serde for the input stream
-   * @param <StreamMessageType> type of messages in this stream
-   * @return an {@link EventHubsInputDescriptor} for the Event Hubs input stream
-   */
-  public <StreamMessageType> EventHubsInputDescriptor<StreamMessageType> getInputDescriptor(String streamId, String namespace,
-      String entityPath, Serde<StreamMessageType> serde) {
-    streamIds.add(streamId);
-    return new EventHubsInputDescriptor<>(streamId, namespace, entityPath, serde, this);
-  }
-
-  /**
-   * Gets an {@link EventHubsOutputDescriptor} for the output stream of this system. The stream has the provided
-   * namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
-   * <p>
-   * The type of messages in the stream is the type of the provided stream level serde.
-   *
-   * @param streamId id of the output stream
-   * @param namespace namespace of the Event Hubs entity to produce to
-   * @param entityPath entity path of the Event Hubs entity to produce to
-   * @param serde stream level serde for the output stream
-   * @param <StreamMessageType> type of the messages in this stream
-   * @return an {@link EventHubsOutputDescriptor} for the Event Hubs output stream
-   */
-  public <StreamMessageType> EventHubsOutputDescriptor<StreamMessageType> getOutputDescriptor(String streamId, String namespace,
-      String entityPath, Serde<StreamMessageType> serde) {
-    streamIds.add(streamId);
-    return new EventHubsOutputDescriptor<>(streamId, namespace, entityPath, serde, this);
-  }
-
-  /**
-   * Timeout for fetching the runtime metadata from an Event Hubs entity on startup in millis.
-   *
-   * @param timeoutMS the timeout in ms for getting runtime information from the Event Hubs system
-   * @return this system descriptor
-   */
-  public EventHubsSystemDescriptor withRuntimeInfoTimeout(int timeoutMS) {
-    this.fetchRuntimeInfoTimeout = Optional.of(timeoutMS);
-    return this;
-  }
-
-  /**
-   * Number of threads in thread pool that will be used by the EventHubClient.
-   *
-   * @param numClientThreads the number of threads
-   * @return this system descriptor
-   */
-  public EventHubsSystemDescriptor withNumClientThreads(int numClientThreads) {
-    this.numClientThreads = Optional.of(numClientThreads);
-    return this;
-  }
-
-  /**
-   *  Per partition capacity of the Event Hubs consumer buffer - the blocking queue used for storing messages.
-   *  Larger buffer capacity typically leads to better throughput but consumes more memory.
-   *
-   * @param receiveQueueSize the number of messages from Event Hubs that should be buffered in the
-   *                      {@link org.apache.samza.util.BlockingEnvelopeMap}
-   * @return this system descriptor
-   */
-  public EventHubsSystemDescriptor withReceiveQueueSize(int receiveQueueSize) {
-    this.consumerReceiveQueueSize = Optional.of(receiveQueueSize);
-    return this;
-  }
-
-  /**
-   * Maximum number of events that Event Hubs client can return in a receive call.
-   *
-   * @param count the number of max events per poll
-   * @return this system descriptor
-   */
-  public EventHubsSystemDescriptor withMaxEventCountPerPoll(int count) {
-    this.consumerMaxEventCountPerPoll = Optional.of(count);
-    return this;
-  }
-
-  /**
-   * Number of events that Event Hubs client should prefetch from the server.
-   *
-   * @param count the number of events that should be prefetched.
-   * @return this system descriptor
-   */
-  public EventHubsSystemDescriptor withPrefetchCount(int count) {
-    this.consumerPrefetchCount = Optional.of(count);
-    return this;
-  }
-
-
-  /**
-   * Configure the method that the message is partitioned for the downstream Event Hubs in one of the following ways:
-   * <ul>
-   *   <li>ROUND_ROBIN:
-   *   The message key and partition key are ignored and the message
-   *   will be distributed in a round-robin fashion amongst all the partitions in the downstream Event Hubs entity.</li>
-   *   <li>EVENT_HUB_HASHING:
-   *   Employs the hashing mechanism in Event Hubs to determine, based on the key of the message,
-   *   which partition the message should go. Using this method still ensures that all the events with
-   *   the same key are sent to the same partition in the event hub. If this option is chosen, the partition
-   *   key used for the hash should be a string. If the partition key is not set, the message key is
-   *   used instead.</li>
-   *   <li>PARTITION_KEY_AS_PARTITION:
-   *   Use the integer key specified by the partition key or key of the message to a specific partition
-   *   on Event Hubs. If the integer key is greater than the number of partitions in the destination Event Hubs entity,
-   *   a modulo operation will be performed to determine the resulting paritition.
-   *   ie. if there are 6 partitions and the key is 9, the message will end up in partition 3.
-   *   Similarly to EVENT_HUB_HASHING, if the partition key is not set the message key is used instead.</li>
-   * </ul>
-   * @param partitioningMethod the desired partitioning method for the message in the downstream Event Hubs entity
-   * @return this system descriptor
-   */
-  public EventHubsSystemDescriptor withPartitioningMethod(PartitioningMethod partitioningMethod) {
-    this.producerEventhubsPartitioningMethod = Optional.ofNullable(partitioningMethod);
-    return this;
-  }
-
-  /**
-   *  If set to true, the key of the Samza message will be included as the 'key' property in the outgoing EventData
-   *  message for Event Hubs. The Samza message key will not be sent otherwise.
-   *  Note: If the Samza Event Hubs consumer is used, this field is the partition key of the received EventData, or the
-   *  message key if the partition key is not present.
-   *
-   * @param sendKeys set to true if the message key should be sent in the EventData properties, the key is not sent otherwise
-   * @return this system descriptor
-   */
-  public EventHubsSystemDescriptor withSendKeys(boolean sendKeys) {
-    this.producerEventhubsSendKey = Optional.of(sendKeys);
-    return this;
-  }
-
-  @Override
-  public Map<String, String> toConfig() {
-    Map<String, String> ehConfigs = new HashMap<>(super.toConfig());
-    String systemName = getSystemName();
-
-    if (!this.streamIds.isEmpty()) {
-      ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), String.join(",", this.streamIds));
-    }
-    this.fetchRuntimeInfoTimeout.ifPresent(timeout ->
-        ehConfigs.put(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName), Integer.toString(timeout)));
-    this.numClientThreads.ifPresent(numClientThreads ->
-        ehConfigs.put(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName), Integer.toString(numClientThreads)));
-    this.consumerReceiveQueueSize.ifPresent(receiveQueueSize ->
-        ehConfigs.put(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName), Integer.toString(receiveQueueSize)));
-    this.consumerMaxEventCountPerPoll.ifPresent(count ->
-        ehConfigs.put(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName), Integer.toString(count)));
-    this.consumerPrefetchCount.ifPresent(count ->
-        ehConfigs.put(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName), Integer.toString(count)));
-    this.producerEventhubsSendKey.ifPresent(sendKeys ->
-        ehConfigs.put(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName), Boolean.toString(sendKeys)));
-    this.producerEventhubsPartitioningMethod.ifPresent(partitioningMethod ->
-        ehConfigs.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName), partitioningMethod.toString()));
-    return ehConfigs;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
new file mode 100644
index 0000000..cce716c
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.system.descriptors.InputDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.eventhub.EventHubConfig;
+
+
+/**
+ * A descriptor for the Event Hubs output stream
+ *<p>
+ *   An instance of this descriptor may be obtained from an {@link EventHubsSystemDescriptor}
+ *</p>
+ * Stream properties configured using a descriptor overrides corresponding properties and property defaults provided
+ * in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream
+ */
+public class EventHubsInputDescriptor<StreamMessageType>
+    extends InputDescriptor<StreamMessageType, EventHubsInputDescriptor<StreamMessageType>> {
+  private String namespace;
+  private String entityPath;
+  private Optional<String> sasKeyName = Optional.empty();
+  private Optional<String> sasToken = Optional.empty();
+  private Optional<String> consumerGroup = Optional.empty();
+
+  /**
+   * Constructs an {@link InputDescriptor} instance.
+   *
+   * @param streamId id of the stream
+   * @param namespace namespace for the Event Hubs entity to consume from, not null
+   * @param entityPath entity path for the Event Hubs entity to consume from, not null
+   * @param serde serde for messages in the stream
+   * @param systemDescriptor system descriptor this stream descriptor was obtained from
+   */
+  EventHubsInputDescriptor(String streamId, String namespace, String entityPath, Serde serde,
+      SystemDescriptor systemDescriptor) {
+    super(streamId, serde, systemDescriptor, null);
+    this.namespace = StringUtils.stripToNull(namespace);
+    this.entityPath = StringUtils.stripToNull(entityPath);
+    if (this.namespace == null || this.entityPath == null) {
+      throw new ConfigException(String.format("Missing namespace and entity path Event Hubs input descriptor in " //
+          + "system: {%s}, stream: {%s}", getSystemName(), streamId));
+    }
+  }
+
+  /**
+   * SAS Key name of the associated input stream. Required to access the input Event Hubs entity per stream.
+   *
+   * @param sasKeyName the name of the SAS key required to access the Event Hubs entity
+   * @return this input descriptor
+   */
+  public EventHubsInputDescriptor<StreamMessageType> withSasKeyName(String sasKeyName) {
+    this.sasKeyName = Optional.of(StringUtils.stripToNull(sasKeyName));
+    return this;
+  }
+
+  /**
+   * SAS Token of the associated input stream. Required to access the input Event Hubs per stream.
+   *
+   * @param sasToken the SAS token required to access the Event Hubs entity
+   * @return this input descriptor
+   */
+  public EventHubsInputDescriptor<StreamMessageType> withSasKey(String sasToken) {
+    this.sasToken = Optional.of(StringUtils.stripToNull(sasToken));
+    return this;
+  }
+
+  /**
+   * Set the consumer group from the upstream Event Hubs entity that the consumer is part of. Defaults to the
+   * <code>$Default</code> group that is initially present in all Event Hubs entities (unless removed)
+   *
+   * @param consumerGroup the name of the consumer group upstream
+   * @return this input descriptor
+   */
+  public EventHubsInputDescriptor<StreamMessageType> withConsumerGroup(String consumerGroup) {
+    this.consumerGroup = Optional.of(StringUtils.stripToNull(consumerGroup));
+    return this;
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    HashMap<String, String> ehConfigs = new HashMap<>(super.toConfig());
+
+    String streamId = getStreamId();
+
+    ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId), namespace);
+    ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId), entityPath);
+
+    sasKeyName.ifPresent(keyName ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId), keyName));
+    sasToken.ifPresent(key ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId), key));
+    this.consumerGroup.ifPresent(consumerGroupName ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId), consumerGroupName));
+    return ehConfigs;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
new file mode 100644
index 0000000..cd17033
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub.descriptors;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.system.descriptors.OutputDescriptor;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.eventhub.EventHubConfig;
+
+/**
+ * A descriptor for an Event Hubs output stream
+ * <p>
+ *   An instance of this descriptor may be obtained from and {@link EventHubsSystemDescriptor}
+ * </p>
+ * Stream properties configured using a descriptor overrides corresponding properties and property defaults provided
+ * in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream
+ */
+public class EventHubsOutputDescriptor<StreamMessageType>
+    extends OutputDescriptor<StreamMessageType, EventHubsOutputDescriptor<StreamMessageType>> {
+  private String namespace;
+  private String entityPath;
+  private Optional<String> sasKeyName = Optional.empty();
+  private Optional<String> sasToken = Optional.empty();
+
+  /**
+   * Constructs an {@link OutputDescriptor} instance.
+   *
+   * @param streamId id of the stream
+   * @param namespace namespace for the Event Hubs entity to produce to, not null
+   * @param entityPath entity path for the Event Hubs entity to produce to, not null
+   * @param serde serde for messages in the stream
+   * @param systemDescriptor system descriptor this stream descriptor was obtained from
+   */
+  EventHubsOutputDescriptor(String streamId, String namespace, String entityPath, Serde serde,
+      SystemDescriptor systemDescriptor) {
+    super(streamId, serde, systemDescriptor);
+    this.namespace = StringUtils.stripToNull(namespace);
+    this.entityPath = StringUtils.stripToNull(entityPath);
+    if (this.namespace == null || this.entityPath == null) {
+      throw new ConfigException(String.format("Missing namespace and entity path Event Hubs output descriptor in " //
+          + "system: {%s}, stream: {%s}", getSystemName(), streamId));
+    }
+  }
+
+  /**
+   * SAS Key name of the associated output stream. Required to access the output Event Hubs entity per stream.
+   *
+   * @param sasKeyName the name of the SAS key required to access the Event Hubs entity
+   * @return this output descriptor
+   */
+  public EventHubsOutputDescriptor<StreamMessageType> withSasKeyName(String sasKeyName) {
+    this.sasKeyName = Optional.of(StringUtils.stripToNull(sasKeyName));
+    return this;
+  }
+
+  /**
+   * SAS Token of the associated output stream. Required to access the output Event Hubs per stream.
+   *
+   * @param sasToken the SAS token required to access to Event Hubs entity
+   * @return this output descriptor
+   */
+  public EventHubsOutputDescriptor<StreamMessageType> withSasKey(String sasToken) {
+    this.sasToken = Optional.of(StringUtils.stripToNull(sasToken));
+    return this;
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    HashMap<String, String> ehConfigs = new HashMap<>(super.toConfig());
+
+    String streamId = getStreamId();
+
+    ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId), namespace);
+    ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId), entityPath);
+    sasKeyName.ifPresent(keyName ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId), keyName));
+    sasToken.ifPresent(key ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId), key));
+    return ehConfigs;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
new file mode 100644
index 0000000..4e292d9
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub.descriptors;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.samza.system.descriptors.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.apache.samza.system.eventhub.EventHubSystemFactory;
+import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
+
+
+/**
+ * A descriptor for a Event Hubs system.
+ * <p>
+ * System properties configured using a descriptor override corresponding properties provided in configuration.
+ */
+public class EventHubsSystemDescriptor extends SystemDescriptor<EventHubsSystemDescriptor> {
+  private static final String FACTORY_CLASS_NAME = EventHubSystemFactory.class.getName();
+
+  private List<String> streamIds = new ArrayList<>();
+  private Optional<Integer> fetchRuntimeInfoTimeout = Optional.empty();
+  private Optional<Integer> numClientThreads = Optional.empty();
+  private Optional<Integer> consumerReceiveQueueSize = Optional.empty();
+  private Optional<Integer> consumerMaxEventCountPerPoll = Optional.empty();
+  private Optional<Integer> consumerPrefetchCount = Optional.empty();
+  private Optional<Boolean> producerEventhubsSendKey = Optional.empty();
+  private Optional<PartitioningMethod> producerEventhubsPartitioningMethod = Optional.empty();
+
+  /**
+   * Constructs a {@link SystemDescriptor} instance.
+   *  @param systemName name of this system
+   */
+  public EventHubsSystemDescriptor(String systemName) {
+    super(systemName, FACTORY_CLASS_NAME, null, null);
+  }
+
+  /**
+   * Gets an {@link EventHubsInputDescriptor} for the input stream of this system. The stream has the provided
+   * namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
+   * <p>
+   * The type of messages in the stream is the type of the provided stream level serde.
+   *
+   * @param streamId id of the input stream
+   * @param namespace namespace of the Event Hubs entity to consume from
+   * @param entityPath entity path of the Event Hubs entity to consume from
+   * @param serde stream level serde for the input stream
+   * @param <StreamMessageType> type of messages in this stream
+   * @return an {@link EventHubsInputDescriptor} for the Event Hubs input stream
+   */
+  public <StreamMessageType> EventHubsInputDescriptor<StreamMessageType> getInputDescriptor(String streamId, String namespace,
+      String entityPath, Serde<StreamMessageType> serde) {
+    streamIds.add(streamId);
+    return new EventHubsInputDescriptor<>(streamId, namespace, entityPath, serde, this);
+  }
+
+  /**
+   * Gets an {@link EventHubsOutputDescriptor} for the output stream of this system. The stream has the provided
+   * namespace and entity name of the associated Event Hubs entity and the provided stream level serde.
+   * <p>
+   * The type of messages in the stream is the type of the provided stream level serde.
+   *
+   * @param streamId id of the output stream
+   * @param namespace namespace of the Event Hubs entity to produce to
+   * @param entityPath entity path of the Event Hubs entity to produce to
+   * @param serde stream level serde for the output stream
+   * @param <StreamMessageType> type of the messages in this stream
+   * @return an {@link EventHubsOutputDescriptor} for the Event Hubs output stream
+   */
+  public <StreamMessageType> EventHubsOutputDescriptor<StreamMessageType> getOutputDescriptor(String streamId, String namespace,
+      String entityPath, Serde<StreamMessageType> serde) {
+    streamIds.add(streamId);
+    return new EventHubsOutputDescriptor<>(streamId, namespace, entityPath, serde, this);
+  }
+
+  /**
+   * Timeout for fetching the runtime metadata from an Event Hubs entity on startup in millis.
+   *
+   * @param timeoutMS the timeout in ms for getting runtime information from the Event Hubs system
+   * @return this system descriptor
+   */
+  public EventHubsSystemDescriptor withRuntimeInfoTimeout(int timeoutMS) {
+    this.fetchRuntimeInfoTimeout = Optional.of(timeoutMS);
+    return this;
+  }
+
+  /**
+   * Number of threads in thread pool that will be used by the EventHubClient.
+   *
+   * @param numClientThreads the number of threads
+   * @return this system descriptor
+   */
+  public EventHubsSystemDescriptor withNumClientThreads(int numClientThreads) {
+    this.numClientThreads = Optional.of(numClientThreads);
+    return this;
+  }
+
+  /**
+   *  Per partition capacity of the Event Hubs consumer buffer - the blocking queue used for storing messages.
+   *  Larger buffer capacity typically leads to better throughput but consumes more memory.
+   *
+   * @param receiveQueueSize the number of messages from Event Hubs that should be buffered in the
+   *                      {@link org.apache.samza.util.BlockingEnvelopeMap}
+   * @return this system descriptor
+   */
+  public EventHubsSystemDescriptor withReceiveQueueSize(int receiveQueueSize) {
+    this.consumerReceiveQueueSize = Optional.of(receiveQueueSize);
+    return this;
+  }
+
+  /**
+   * Maximum number of events that Event Hubs client can return in a receive call.
+   *
+   * @param count the number of max events per poll
+   * @return this system descriptor
+   */
+  public EventHubsSystemDescriptor withMaxEventCountPerPoll(int count) {
+    this.consumerMaxEventCountPerPoll = Optional.of(count);
+    return this;
+  }
+
+  /**
+   * Number of events that Event Hubs client should prefetch from the server.
+   *
+   * @param count the number of events that should be prefetched.
+   * @return this system descriptor
+   */
+  public EventHubsSystemDescriptor withPrefetchCount(int count) {
+    this.consumerPrefetchCount = Optional.of(count);
+    return this;
+  }
+
+
+  /**
+   * Configure the method that the message is partitioned for the downstream Event Hubs in one of the following ways:
+   * <ul>
+   *   <li>ROUND_ROBIN:
+   *   The message key and partition key are ignored and the message
+   *   will be distributed in a round-robin fashion amongst all the partitions in the downstream Event Hubs entity.</li>
+   *   <li>EVENT_HUB_HASHING:
+   *   Employs the hashing mechanism in Event Hubs to determine, based on the key of the message,
+   *   which partition the message should go. Using this method still ensures that all the events with
+   *   the same key are sent to the same partition in the event hub. If this option is chosen, the partition
+   *   key used for the hash should be a string. If the partition key is not set, the message key is
+   *   used instead.</li>
+   *   <li>PARTITION_KEY_AS_PARTITION:
+   *   Use the integer key specified by the partition key or key of the message to a specific partition
+   *   on Event Hubs. If the integer key is greater than the number of partitions in the destination Event Hubs entity,
+   *   a modulo operation will be performed to determine the resulting paritition.
+   *   ie. if there are 6 partitions and the key is 9, the message will end up in partition 3.
+   *   Similarly to EVENT_HUB_HASHING, if the partition key is not set the message key is used instead.</li>
+   * </ul>
+   * @param partitioningMethod the desired partitioning method for the message in the downstream Event Hubs entity
+   * @return this system descriptor
+   */
+  public EventHubsSystemDescriptor withPartitioningMethod(PartitioningMethod partitioningMethod) {
+    this.producerEventhubsPartitioningMethod = Optional.ofNullable(partitioningMethod);
+    return this;
+  }
+
+  /**
+   *  If set to true, the key of the Samza message will be included as the 'key' property in the outgoing EventData
+   *  message for Event Hubs. The Samza message key will not be sent otherwise.
+   *  Note: If the Samza Event Hubs consumer is used, this field is the partition key of the received EventData, or the
+   *  message key if the partition key is not present.
+   *
+   * @param sendKeys set to true if the message key should be sent in the EventData properties, the key is not sent otherwise
+   * @return this system descriptor
+   */
+  public EventHubsSystemDescriptor withSendKeys(boolean sendKeys) {
+    this.producerEventhubsSendKey = Optional.of(sendKeys);
+    return this;
+  }
+
+  @Override
+  public Map<String, String> toConfig() {
+    Map<String, String> ehConfigs = new HashMap<>(super.toConfig());
+    String systemName = getSystemName();
+
+    if (!this.streamIds.isEmpty()) {
+      ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), String.join(",", this.streamIds));
+    }
+    this.fetchRuntimeInfoTimeout.ifPresent(timeout ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName), Integer.toString(timeout)));
+    this.numClientThreads.ifPresent(numClientThreads ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName), Integer.toString(numClientThreads)));
+    this.consumerReceiveQueueSize.ifPresent(receiveQueueSize ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName), Integer.toString(receiveQueueSize)));
+    this.consumerMaxEventCountPerPoll.ifPresent(count ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName), Integer.toString(count)));
+    this.consumerPrefetchCount.ifPresent(count ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName), Integer.toString(count)));
+    this.producerEventhubsSendKey.ifPresent(sendKeys ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName), Boolean.toString(sendKeys)));
+    this.producerEventhubsPartitioningMethod.ifPresent(partitioningMethod ->
+        ehConfigs.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName), partitioningMethod.toString()));
+    return ehConfigs;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsInputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsInputDescriptor.java
deleted file mode 100644
index b3003a3..0000000
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsInputDescriptor.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system.eventhub;
-
-import java.util.Map;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.operators.KV;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-
-public class TestEventHubsInputDescriptor {
-  @Test
-  public void testEntityConnectionConfigs() {
-    String systemName = "eventHub";
-    String streamId = "input-stream";
-
-    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
-
-    EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
-        .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
-        .withSasKeyName("secretkey")
-        .withSasKey("sasToken-123")
-        .withConsumerGroup("$notdefault");
-
-    Map<String, String> generatedConfigs = inputDescriptor.toConfig();
-    assertEquals("eventHub", generatedConfigs.get("streams.input-stream.samza.system"));
-    assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
-    assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
-    assertEquals("secretkey", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
-    assertEquals("sasToken-123", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
-    assertEquals("$notdefault", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId)));
-  }
-
-  @Test
-  public void testWithoutEntityConnectionConfigs() {
-    String systemName = "eventHub";
-    String streamId = "input-stream";
-
-    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
-
-    EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
-        .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
-
-    Map<String, String> generatedConfigs = inputDescriptor.toConfig();
-    assertEquals("eventHub", generatedConfigs.get("streams.input-stream.samza.system"));
-    assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
-    assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
-    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
-    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
-    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId)));
-    assertEquals(3, generatedConfigs.size()); // verify that there are no other configs
-  }
-
-  @Test
-  public void testMissingInputDescriptorFields() {
-    String systemName = "eventHub";
-    String streamId = "input-stream";
-
-    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
-    try {
-      systemDescriptor.getInputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
-      fail("Should have thrown Config Exception");
-    } catch (ConfigException exception) {
-      assertEquals(String.format("Missing namespace and entity path Event Hubs input descriptor in " //
-          + "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsOutputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsOutputDescriptor.java
deleted file mode 100644
index fcfcdca..0000000
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsOutputDescriptor.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system.eventhub;
-
-import java.util.Map;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.operators.KV;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-public class TestEventHubsOutputDescriptor {
-  @Test
-  public void testEntityConnectionConfigs() {
-    String systemName = "eventHub";
-    String streamId = "output-stream";
-
-    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
-
-    EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
-        .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
-        .withSasKeyName("secretkey")
-        .withSasKey("sasToken-123");
-
-    Map<String, String> generatedConfigs = outputDescriptor.toConfig();
-    assertEquals("eventHub", generatedConfigs.get("streams.output-stream.samza.system"));
-    assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
-    assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
-    assertEquals("secretkey", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
-    assertEquals("sasToken-123", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
-  }
-
-  @Test
-  public void testWithoutEntityConnectionConfigs() {
-    String systemName = "eventHub";
-    String streamId = "output-stream";
-
-    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
-
-    EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
-        .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
-
-    Map<String, String> generatedConfigs = outputDescriptor.toConfig();
-    assertEquals("eventHub", generatedConfigs.get("streams.output-stream.samza.system"));
-    assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
-    assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
-    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
-    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
-    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId)));
-    assertEquals(3, generatedConfigs.size()); // verify that there are no other configs
-  }
-
-  @Test
-  public void testMissingOutputDescriptorFields() {
-    String systemName = "eventHub";
-    String streamId = "input-stream";
-
-    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
-    try {
-      systemDescriptor.getOutputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
-      fail("Should have thrown Config Exception");
-    } catch (ConfigException exception) {
-      assertEquals(String.format("Missing namespace and entity path Event Hubs output descriptor in " //
-          + "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java
deleted file mode 100644
index 33bb1ba..0000000
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/TestEventHubsSystemDescriptor.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system.eventhub;
-
-import java.util.Map;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
-public class TestEventHubsSystemDescriptor {
-  @Test
-  public void testWithDescriptorOverrides() {
-    String systemName = "system-name";
-    String streamId1 = "input-stream1";
-    String streamId2 = "input-stream2";
-    String streamId3 = "output-stream1";
-    String streamId4 = "output-stream2";
-
-    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName)
-        .withMaxEventCountPerPoll(1000)
-        .withNumClientThreads(5)
-        .withPartitioningMethod(PartitioningMethod.PARTITION_KEY_AS_PARTITION)
-        .withPrefetchCount(100)
-        .withReceiveQueueSize(500)
-        .withRuntimeInfoTimeout(60000)
-        .withSendKeys(false);
-
-    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
-        new IntegerSerde()));
-    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
-            new IntegerSerde()));
-    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
-            new IntegerSerde()));
-    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
-            new IntegerSerde()));
-
-    Map<String, String> generatedConfigs = systemDescriptor.toConfig();
-    assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
-    assertEquals("1000", generatedConfigs.get(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName)));
-    assertEquals("5", generatedConfigs.get(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName)));
-    assertEquals("PARTITION_KEY_AS_PARTITION", generatedConfigs.get(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName)));
-    assertEquals("100", generatedConfigs.get(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName)));
-    assertEquals("500", generatedConfigs.get(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName)));
-    assertEquals("60000", generatedConfigs.get(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName)));
-    assertEquals("false", generatedConfigs.get(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName)));
-    assertEquals(streamId1 + "," + streamId2 + "," + streamId3 + "," + streamId4, generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName)));
-  }
-
-  @Test
-  public void testWithoutDescriptorOverrides() {
-    String systemName = "eventHub";
-
-    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
-
-    Map<String, String> generatedConfigs = systemDescriptor.toConfig();
-    assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
-    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName)));
-    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName)));
-    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName)));
-    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName)));
-    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName)));
-    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName)));
-    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName)));
-    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName)));
-    assertEquals(1, generatedConfigs.size());
-  }
-  @Test
-  public void testWithInputOutputStreams() {
-    String systemName = "system-name";
-    String streamId1 = "input-stream1";
-    String streamId2 = "input-stream2";
-    String streamId3 = "output-stream1";
-    String streamId4 = "output-stream2";
-
-    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
-
-    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
-        new IntegerSerde()));
-    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
-        new IntegerSerde()));
-    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
-        new IntegerSerde()));
-    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
-        new IntegerSerde()));
-
-    Map<String, String> generatedConfigs = systemDescriptor.toConfig();
-    assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
-    assertEquals(streamId1 + "," + streamId2 + "," + streamId3 + "," + streamId4, generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName)));
-    assertEquals(2, generatedConfigs.size());
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
new file mode 100644
index 0000000..1e6b368
--- /dev/null
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsInputDescriptor.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub.descriptors;
+
+import java.util.Map;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+
+public class TestEventHubsInputDescriptor {
+  @Test
+  public void testEntityConnectionConfigs() {
+    String systemName = "eventHub";
+    String streamId = "input-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+    EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
+        .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
+        .withSasKeyName("secretkey")
+        .withSasKey("sasToken-123")
+        .withConsumerGroup("$notdefault");
+
+    Map<String, String> generatedConfigs = inputDescriptor.toConfig();
+    assertEquals("eventHub", generatedConfigs.get("streams.input-stream.samza.system"));
+    assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
+    assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
+    assertEquals("secretkey", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
+    assertEquals("sasToken-123", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
+    assertEquals("$notdefault", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId)));
+  }
+
+  @Test
+  public void testWithoutEntityConnectionConfigs() {
+    String systemName = "eventHub";
+    String streamId = "input-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+    EventHubsInputDescriptor<KV<String, Integer>> inputDescriptor = systemDescriptor
+        .getInputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
+
+    Map<String, String> generatedConfigs = inputDescriptor.toConfig();
+    assertEquals("eventHub", generatedConfigs.get("streams.input-stream.samza.system"));
+    assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
+    assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId)));
+    assertEquals(3, generatedConfigs.size()); // verify that there are no other configs
+  }
+
+  @Test
+  public void testMissingInputDescriptorFields() {
+    String systemName = "eventHub";
+    String streamId = "input-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+    try {
+      systemDescriptor.getInputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
+      fail("Should have thrown Config Exception");
+    } catch (ConfigException exception) {
+      assertEquals(String.format("Missing namespace and entity path Event Hubs input descriptor in " //
+          + "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
new file mode 100644
index 0000000..fa8ae56
--- /dev/null
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsOutputDescriptor.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub.descriptors;
+
+import java.util.Map;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+public class TestEventHubsOutputDescriptor {
+  @Test
+  public void testEntityConnectionConfigs() {
+    String systemName = "eventHub";
+    String streamId = "output-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+    EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
+        .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()))
+        .withSasKeyName("secretkey")
+        .withSasKey("sasToken-123");
+
+    Map<String, String> generatedConfigs = outputDescriptor.toConfig();
+    assertEquals("eventHub", generatedConfigs.get("streams.output-stream.samza.system"));
+    assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
+    assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
+    assertEquals("secretkey", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
+    assertEquals("sasToken-123", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
+  }
+
+  @Test
+  public void testWithoutEntityConnectionConfigs() {
+    String systemName = "eventHub";
+    String streamId = "output-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+    EventHubsOutputDescriptor<KV<String, Integer>> outputDescriptor = systemDescriptor
+        .getOutputDescriptor(streamId, "entity-namespace", "entity3", KVSerde.of(new StringSerde(), new IntegerSerde()));
+
+    Map<String, String> generatedConfigs = outputDescriptor.toConfig();
+    assertEquals("eventHub", generatedConfigs.get("streams.output-stream.samza.system"));
+    assertEquals("entity-namespace", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamId)));
+    assertEquals("entity3", generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamId)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId)));
+    assertEquals(3, generatedConfigs.size()); // verify that there are no other configs
+  }
+
+  @Test
+  public void testMissingOutputDescriptorFields() {
+    String systemName = "eventHub";
+    String streamId = "input-stream";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+    try {
+      systemDescriptor.getOutputDescriptor(streamId, null, null, KVSerde.of(new StringSerde(), new IntegerSerde()));
+      fail("Should have thrown Config Exception");
+    } catch (ConfigException exception) {
+      assertEquals(String.format("Missing namespace and entity path Event Hubs output descriptor in " //
+          + "system: {%s}, stream: {%s}", systemName, streamId), exception.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
new file mode 100644
index 0000000..7f73bd9
--- /dev/null
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/descriptors/TestEventHubsSystemDescriptor.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.eventhub.descriptors;
+
+import java.util.Map;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.eventhub.EventHubConfig;
+import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.PartitioningMethod;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestEventHubsSystemDescriptor {
+  @Test
+  public void testWithDescriptorOverrides() {
+    String systemName = "system-name";
+    String streamId1 = "input-stream1";
+    String streamId2 = "input-stream2";
+    String streamId3 = "output-stream1";
+    String streamId4 = "output-stream2";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName)
+        .withMaxEventCountPerPoll(1000)
+        .withNumClientThreads(5)
+        .withPartitioningMethod(PartitioningMethod.PARTITION_KEY_AS_PARTITION)
+        .withPrefetchCount(100)
+        .withReceiveQueueSize(500)
+        .withRuntimeInfoTimeout(60000)
+        .withSendKeys(false);
+
+    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
+        new IntegerSerde()));
+    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
+            new IntegerSerde()));
+    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
+            new IntegerSerde()));
+    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
+            new IntegerSerde()));
+
+    Map<String, String> generatedConfigs = systemDescriptor.toConfig();
+    assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
+    assertEquals("1000", generatedConfigs.get(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName)));
+    assertEquals("5", generatedConfigs.get(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName)));
+    assertEquals("PARTITION_KEY_AS_PARTITION", generatedConfigs.get(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName)));
+    assertEquals("100", generatedConfigs.get(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName)));
+    assertEquals("500", generatedConfigs.get(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName)));
+    assertEquals("60000", generatedConfigs.get(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName)));
+    assertEquals("false", generatedConfigs.get(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName)));
+    assertEquals(streamId1 + "," + streamId2 + "," + streamId3 + "," + streamId4, generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName)));
+  }
+
+  @Test
+  public void testWithoutDescriptorOverrides() {
+    String systemName = "eventHub";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+    Map<String, String> generatedConfigs = systemDescriptor.toConfig();
+    assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_MAX_EVENT_COUNT_PER_POLL, systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_SYSTEM_NUM_CLIENT_THREADS, systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_PREFETCH_COUNT, systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_CONSUMER_BUFFER_CAPACITY, systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_FETCH_RUNTIME_INFO_TIMEOUT_MILLIS, systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_SEND_KEY_IN_EVENT_PROPERTIES, systemName)));
+    assertNull(generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName)));
+    assertEquals(1, generatedConfigs.size());
+  }
+  @Test
+  public void testWithInputOutputStreams() {
+    String systemName = "system-name";
+    String streamId1 = "input-stream1";
+    String streamId2 = "input-stream2";
+    String streamId3 = "output-stream1";
+    String streamId4 = "output-stream2";
+
+    EventHubsSystemDescriptor systemDescriptor = new EventHubsSystemDescriptor(systemName);
+
+    systemDescriptor.getInputDescriptor(streamId1, "entity-namespace1", "entity1", KVSerde.of(new StringSerde(),
+        new IntegerSerde()));
+    systemDescriptor.getInputDescriptor(streamId2, "entity-namespace2", "entity2", KVSerde.of(new StringSerde(),
+        new IntegerSerde()));
+    systemDescriptor.getOutputDescriptor(streamId3, "entity-namespace3", "entity3", KVSerde.of(new StringSerde(),
+        new IntegerSerde()));
+    systemDescriptor.getOutputDescriptor(streamId4, "entity-namespace4", "entity4", KVSerde.of(new StringSerde(),
+        new IntegerSerde()));
+
+    Map<String, String> generatedConfigs = systemDescriptor.toConfig();
+    assertEquals("org.apache.samza.system.eventhub.EventHubSystemFactory", generatedConfigs.get(String.format("systems.%s.samza.factory", systemName)));
+    assertEquals(streamId1 + "," + streamId2 + "," + streamId3 + "," + streamId4, generatedConfigs.get(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName)));
+    assertEquals(2, generatedConfigs.size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
deleted file mode 100644
index 5416af5..0000000
--- a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import org.apache.samza.config.Config;
-import org.apache.samza.context.ApplicationContainerContext;
-import org.apache.samza.context.ApplicationContainerContextFactory;
-import org.apache.samza.context.ApplicationTaskContext;
-import org.apache.samza.context.ApplicationTaskContextFactory;
-import org.apache.samza.metrics.MetricsReporterFactory;
-import org.apache.samza.operators.KV;
-import org.apache.samza.operators.TableDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
-import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
-import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
-import org.apache.samza.operators.spec.InputOperatorSpec;
-import org.apache.samza.runtime.ProcessorLifecycleListener;
-import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.serializers.Serde;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * This is the base class that implements interface {@link ApplicationDescriptor}.
- * <p>
- * This base class contains the common objects that are used by both high-level and low-level API applications, such as
- * {@link Config}, {@link ApplicationContainerContextFactory}, {@link ApplicationTaskContextFactory}, and
- * {@link ProcessorLifecycleListenerFactory}.
- *
- * @param <S> the type of {@link ApplicationDescriptor} interface this implements. It has to be either
- *            {@link StreamApplicationDescriptor} or {@link TaskApplicationDescriptor}
- */
-public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor>
-    implements ApplicationDescriptor<S> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationDescriptorImpl.class);
-
-  private final Class<? extends SamzaApplication> appClass;
-  private final Map<String, MetricsReporterFactory> reporterFactories = new LinkedHashMap<>();
-  // serdes used by input/output/intermediate streams, keyed by streamId
-  private final Map<String, KV<Serde, Serde>> streamSerdes = new HashMap<>();
-  // serdes used by tables, keyed by tableId
-  private final Map<String, KV<Serde, Serde>> tableSerdes = new HashMap<>();
-  final Config config;
-
-  private Optional<ApplicationContainerContextFactory<?>> applicationContainerContextFactoryOptional = Optional.empty();
-  private Optional<ApplicationTaskContextFactory<?>> applicationTaskContextFactoryOptional = Optional.empty();
-
-  // Default to no-op  ProcessorLifecycleListenerFactory
-  ProcessorLifecycleListenerFactory listenerFactory = (pcontext, cfg) -> new ProcessorLifecycleListener() { };
-
-  ApplicationDescriptorImpl(SamzaApplication app, Config config) {
-    this.config = config;
-    this.appClass = app.getClass();
-  }
-
-  @Override
-  public Config getConfig() {
-    return config;
-  }
-
-  @Override
-  public S withApplicationContainerContextFactory(ApplicationContainerContextFactory<?> factory) {
-    this.applicationContainerContextFactoryOptional = Optional.of(factory);
-    return (S) this;
-  }
-
-  @Override
-  public S withApplicationTaskContextFactory(ApplicationTaskContextFactory<?> factory) {
-    this.applicationTaskContextFactoryOptional = Optional.of(factory);
-    return (S) this;
-  }
-
-  @Override
-  public S withProcessorLifecycleListenerFactory(ProcessorLifecycleListenerFactory listenerFactory) {
-    this.listenerFactory = listenerFactory;
-    return (S) this;
-  }
-
-  @Override
-  public S withMetricsReporterFactories(Map<String, MetricsReporterFactory> reporterFactories) {
-    this.reporterFactories.clear();
-    this.reporterFactories.putAll(reporterFactories);
-    return (S) this;
-  }
-
-  /**
-   * Get the application class
-   *
-   * @return an implementation of {@link SamzaApplication}
-   */
-  public Class<? extends SamzaApplication> getAppClass() {
-    return appClass;
-  }
-
-  /**
-   * Get the {@link ApplicationContainerContextFactory} specified by the application.
-   *
-   * @return {@link ApplicationContainerContextFactory} if application specified it; empty otherwise
-   */
-  public Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> getApplicationContainerContextFactory() {
-    @SuppressWarnings("unchecked") // ok because all context types are at least ApplicationContainerContext
-    Optional<ApplicationContainerContextFactory<ApplicationContainerContext>> factoryOptional =
-        (Optional) this.applicationContainerContextFactoryOptional;
-    return factoryOptional;
-  }
-
-  /**
-   * Get the {@link ApplicationTaskContextFactory} specified by the application.
-   *
-   * @return {@link ApplicationTaskContextFactory} if application specified it; empty otherwise
-   */
-  public Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> getApplicationTaskContextFactory() {
-    @SuppressWarnings("unchecked") // ok because all context types are at least ApplicationTaskContext
-    Optional<ApplicationTaskContextFactory<ApplicationTaskContext>> factoryOptional =
-        (Optional) this.applicationTaskContextFactoryOptional;
-    return factoryOptional;
-  }
-
-  /**
-   * Get the {@link ProcessorLifecycleListenerFactory} associated with this application
-   *
-   * @return the {@link ProcessorLifecycleListenerFactory} in this application
-   */
-  public ProcessorLifecycleListenerFactory getProcessorLifecycleListenerFactory() {
-    return listenerFactory;
-  }
-
-  /**
-   * Get the {@link MetricsReporterFactory}s used in the application
-   *
-   * @return the map of {@link MetricsReporterFactory}s
-   */
-  public Map<String, MetricsReporterFactory> getMetricsReporterFactories() {
-    return Collections.unmodifiableMap(reporterFactories);
-  }
-
-  /**
-   * Get the default {@link SystemDescriptor} in this application
-   *
-   * @return the default {@link SystemDescriptor}
-   */
-  public Optional<SystemDescriptor> getDefaultSystemDescriptor() {
-    // default is not set
-    return Optional.empty();
-  }
-
-  /**
-   * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
-   *
-   * @param streamId id of the stream
-   * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
-   */
-  public KV<Serde, Serde> getStreamSerdes(String streamId) {
-    return streamSerdes.get(streamId);
-  }
-
-  /**
-   * Get the corresponding {@link KVSerde} for the input {@code inputStreamId}
-   *
-   * @param tableId id of the table
-   * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist
-   */
-  public KV<Serde, Serde> getTableSerdes(String tableId) {
-    return tableSerdes.get(tableId);
-  }
-
-  /**
-   * Get the map of all {@link InputOperatorSpec}s in this applicaiton
-   *
-   * @return an immutable map from streamId to {@link InputOperatorSpec}. Default to empty map for low-level {@link TaskApplication}
-   */
-  public Map<String, InputOperatorSpec> getInputOperators() {
-    return Collections.EMPTY_MAP;
-  }
-
-  /**
-   * Get all the {@link InputDescriptor}s to this application
-   *
-   * @return an immutable map of streamId to {@link InputDescriptor}
-   */
-  public abstract Map<String, InputDescriptor> getInputDescriptors();
-
-  /**
-   * Get all the {@link OutputDescriptor}s from this application
-   *
-   * @return an immutable map of streamId to {@link OutputDescriptor}
-   */
-  public abstract Map<String, OutputDescriptor> getOutputDescriptors();
-
-  /**
-   * Get all the broadcast streamIds from this application
-   *
-   * @return an immutable set of streamIds
-   */
-  public abstract Set<String> getBroadcastStreams();
-
-  /**
-   * Get all the {@link TableDescriptor}s in this application
-   *
-   * @return an immutable set of {@link TableDescriptor}s
-   */
-  public abstract Set<TableDescriptor> getTableDescriptors();
-
-  /**
-   * Get all the unique {@link SystemDescriptor}s in this application
-   *
-   * @return an immutable set of {@link SystemDescriptor}s
-   */
-  public abstract Set<SystemDescriptor> getSystemDescriptors();
-
-  /**
-   * Get all the unique input streamIds in this application
-   *
-   * @return an immutable set of input streamIds
-   */
-  public abstract Set<String> getInputStreamIds();
-
-  /**
-   * Get all the unique output streamIds in this application
-   *
-   * @return an immutable set of output streamIds
-   */
-  public abstract Set<String> getOutputStreamIds();
-
-  KV<Serde, Serde> getOrCreateStreamSerdes(String streamId, Serde serde) {
-    Serde keySerde, valueSerde;
-
-    KV<Serde, Serde> currentSerdePair = streamSerdes.get(streamId);
-
-    if (serde instanceof KVSerde) {
-      keySerde = ((KVSerde) serde).getKeySerde();
-      valueSerde = ((KVSerde) serde).getValueSerde();
-    } else {
-      keySerde = new NoOpSerde();
-      valueSerde = serde;
-    }
-
-    if (currentSerdePair == null) {
-      if (keySerde instanceof NoOpSerde) {
-        LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
-            ". Keys will not be (de)serialized");
-      }
-      if (valueSerde instanceof NoOpSerde) {
-        LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
-            ". Values will not be (de)serialized");
-      }
-      streamSerdes.put(streamId, KV.of(keySerde, valueSerde));
-    } else if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
-      throw new IllegalArgumentException(String.format("Serde for stream %s is already defined. Cannot change it to "
-          + "different serdes.", streamId));
-    }
-    return streamSerdes.get(streamId);
-  }
-
-  KV<Serde, Serde> getOrCreateTableSerdes(String tableId, KVSerde kvSerde) {
-    Serde keySerde, valueSerde;
-    keySerde = kvSerde.getKeySerde();
-    valueSerde = kvSerde.getValueSerde();
-
-    if (!tableSerdes.containsKey(tableId)) {
-      tableSerdes.put(tableId, KV.of(keySerde, valueSerde));
-      return tableSerdes.get(tableId);
-    }
-
-    KV<Serde, Serde> currentSerdePair = tableSerdes.get(tableId);
-    if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) {
-      throw new IllegalArgumentException(String.format("Serde for table %s is already defined. Cannot change it to "
-          + "different serdes.", tableId));
-    }
-    return streamSerdes.get(tableId);
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/74675cea/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java
deleted file mode 100644
index acf4bf7..0000000
--- a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorUtil.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.application;
-
-import org.apache.samza.config.Config;
-
-
-/**
- * Util class to help creating {@link ApplicationDescriptorImpl} instance from {@link SamzaApplication} and {@link Config}
- */
-public class ApplicationDescriptorUtil {
-
-  private ApplicationDescriptorUtil() {
-
-  }
-
-  /**
-   * Create a new instance of {@link ApplicationDescriptorImpl} based on {@link SamzaApplication} and {@link Config}
-   *
-   * @param app an implementation of {@link SamzaApplication}. The {@code app} has to have a proper fully-qualified class name.
-   * @param config the {@link Config} for the application
-   * @return the {@link ApplicationDescriptorImpl} instance containing the processing logic and the config
-   */
-  public static ApplicationDescriptorImpl<? extends ApplicationDescriptor> getAppDescriptor(SamzaApplication app, Config config) {
-    if (app instanceof StreamApplication) {
-      return new StreamApplicationDescriptorImpl((StreamApplication) app, config);
-    }
-    if (app instanceof TaskApplication) {
-      return new TaskApplicationDescriptorImpl((TaskApplication) app, config);
-    }
-    throw new IllegalArgumentException(String.format("User application class %s is not supported. Only StreamApplication "
-        + "and TaskApplication are supported.", app.getClass().getName()));
-  }
-
-}
\ No newline at end of file