You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/03/23 00:08:19 UTC

samza git commit: Making event hub configs Samza compliant

Repository: samza
Updated Branches:
  refs/heads/master 956cf412a -> 272aa32e3


Making event hub configs Samza compliant

Fixes for Bugs

- SAMZA-1571 Make Eventhubs system configs compatible with Samza standalone.
- SAMZA-1624 EventHub system should prefix the configs with senstive for SasKey and SasToken
- SAMZA-1625 EventHub systemAdmin is swallowing exceptions
- SAMZA-1626 EventHub system admin is not returning the metadata for all the ssps requested for

Description

1. Right now event hub doesn't follow the samza's config convention of naming the secrets as "sensitive" so that they are masked before they are logged.
2. Event hub configs uses the old system.<systemName>.streams.<streamName> which is blacklisted in Samza standalone. So moving these configs to newer <streams>.<streamid>
3. Wrapping the underlying exception properly in the SamzaException in EventHubSystemAdmin
4. Porting Bharat's fix to return the metadata for all the ssps requested for in EventHubSystemAdmin

Author: Srinivasulu Punuru <sp...@linkedin.com>

Reviewers: Xinyu Liu <xi...@gmail.com>

Closes #453 from srinipunuru/eh.1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/272aa32e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/272aa32e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/272aa32e

Branch: refs/heads/master
Commit: 272aa32e3045a7eb4b429e7790931a914317eef3
Parents: 956cf41
Author: Srinivasulu Punuru <sp...@linkedin.com>
Authored: Thu Mar 22 17:08:11 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Thu Mar 22 17:08:11 2018 -0700

----------------------------------------------------------------------
 .../samza/system/eventhub/EventHubConfig.java   | 30 ++++++------
 .../eventhub/admin/EventHubSystemAdmin.java     | 10 ++--
 .../eventhub/MockEventHubConfigFactory.java     | 18 ++++----
 .../consumer/TestEventHubSystemConsumer.java    | 48 ++++++++++----------
 .../producer/TestEventHubSystemProducer.java    | 24 +++++-----
 5 files changed, 66 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/272aa32e/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
index 5d83911..e9c383a 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
@@ -36,15 +36,15 @@ public class EventHubConfig extends MapConfig {
 
   public static final String CONFIG_STREAM_LIST = "systems.%s.stream.list";
 
-  public static final String CONFIG_STREAM_NAMESPACE = "systems.%s.streams.%s.eventhubs.namespace";
+  public static final String CONFIG_STREAM_NAMESPACE = "streams.%s.eventhubs.namespace";
 
-  public static final String CONFIG_STREAM_ENTITYPATH = "systems.%s.streams.%s.eventhubs.entitypath";
+  public static final String CONFIG_STREAM_ENTITYPATH = "streams.%s.eventhubs.entitypath";
 
-  public static final String CONFIG_STREAM_SAS_KEY_NAME = "systems.%s.streams.%s.eventhubs.sas.keyname";
+  public static final String CONFIG_STREAM_SAS_KEY_NAME = Config.SENSITIVE_PREFIX + "streams.%s.eventhubs.sas.keyname";
 
-  public static final String CONFIG_STREAM_SAS_TOKEN = "systems.%s.streams.%s.eventhubs.sas.token";
+  public static final String CONFIG_STREAM_SAS_TOKEN = Config.SENSITIVE_PREFIX + "streams.%s.eventhubs.sas.token";
 
-  public static final String CONFIG_STREAM_CONSUMER_GROUP = "systems.%s.streams.%s.eventhubs.consumer.group";
+  public static final String CONFIG_STREAM_CONSUMER_GROUP = "streams.%s.eventhubs.consumer.group";
   public static final String DEFAULT_CONFIG_STREAM_CONSUMER_GROUP = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME;
 
   public static final String CONFIG_PRODUCER_PARTITION_METHOD = "systems.%s.eventhubs.partition.method";
@@ -71,18 +71,18 @@ public class EventHubConfig extends MapConfig {
             .forEach((streamId) -> physcialToId.put(streamConfig.getPhysicalName(streamId), streamId));
   }
 
-  private String getFromStreamIdOrName(String configName, String systemName, String streamName, String defaultString) {
-    String result = getFromStreamIdOrName(configName, systemName, streamName);
+  private String getFromStreamIdOrName(String configName, String streamName, String defaultString) {
+    String result = getFromStreamIdOrName(configName, streamName);
     if (result == null) {
       return defaultString;
     }
     return result;
   }
 
-  private String getFromStreamIdOrName(String configName, String systemName, String streamName) {
+  private String getFromStreamIdOrName(String configName, String streamName) {
     String streamId = getStreamId(streamName);
-    return get(String.format(configName, systemName, streamId),
-            streamId.equals(streamName) ? null : get(String.format(configName, systemName, streamName)));
+    return get(String.format(configName, streamId),
+            streamId.equals(streamName) ? null : get(String.format(configName, streamName)));
   }
 
   private String validateRequiredConfig(String value, String fieldName, String systemName, String streamName) {
@@ -122,7 +122,7 @@ public class EventHubConfig extends MapConfig {
    * @return EventHubs namespace
    */
   public String getStreamNamespace(String systemName, String streamName) {
-    return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_NAMESPACE, systemName, streamName),
+    return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_NAMESPACE, streamName),
             "Namespace", systemName, streamName);
   }
 
@@ -134,7 +134,7 @@ public class EventHubConfig extends MapConfig {
    * @return EventHubs entity path
    */
   public String getStreamEntityPath(String systemName, String streamName) {
-    return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_ENTITYPATH, systemName, streamName),
+    return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_ENTITYPATH, streamName),
             "EntityPath", systemName, streamName);
   }
 
@@ -146,7 +146,7 @@ public class EventHubConfig extends MapConfig {
    * @return EventHubs SAS key name
    */
   public String getStreamSasKeyName(String systemName, String streamName) {
-    return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName),
+    return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_KEY_NAME, streamName),
             "SASKeyName", systemName, streamName);
   }
 
@@ -158,7 +158,7 @@ public class EventHubConfig extends MapConfig {
    * @return EventHubs SAS token
    */
   public String getStreamSasToken(String systemName, String streamName) {
-    return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_TOKEN, systemName, streamName),
+    return validateRequiredConfig(getFromStreamIdOrName(CONFIG_STREAM_SAS_TOKEN, streamName),
             "SASToken", systemName, streamName);
   }
 
@@ -170,7 +170,7 @@ public class EventHubConfig extends MapConfig {
    * @return EventHubs consumer group
    */
   public String getStreamConsumerGroup(String systemName, String streamName) {
-    return getFromStreamIdOrName(CONFIG_STREAM_CONSUMER_GROUP, systemName, streamName, DEFAULT_CONFIG_STREAM_CONSUMER_GROUP);
+    return getFromStreamIdOrName(CONFIG_STREAM_CONSUMER_GROUP, streamName, DEFAULT_CONFIG_STREAM_CONSUMER_GROUP);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/samza/blob/272aa32e/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
index 91d504c..5564747 100644
--- a/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
+++ b/samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
@@ -104,9 +104,10 @@ public class EventHubSystemAdmin implements SystemAdmin {
               streamPartitions.put(streamName, ehInfo.getPartitionIds());
             } catch (InterruptedException | ExecutionException | TimeoutException e) {
 
-              String msg = String.format("Error while fetching EventHubRuntimeInfo for System:%s, Stream:%s",
-                      systemName, streamName);
-              throw new SamzaException(msg);
+              String msg = String.format("Error while fetching EventHubRuntimeInfo for System:%s, Stream:%s", systemName,
+                  streamName);
+              LOG.error(msg, e);
+              throw new SamzaException(msg, e);
             }
           }
 
@@ -172,7 +173,8 @@ public class EventHubSystemAdmin implements SystemAdmin {
           String msg = String.format(
                   "Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s, Partition:%s",
                   systemName, streamName, partitionId);
-          throw new SamzaException(msg);
+          LOG.error(msg, e);
+          throw new SamzaException(msg, e);
         }
       });
     return sspMetadataMap;

http://git-wip-us.apache.org/repos/asf/samza/blob/272aa32e/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubConfigFactory.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubConfigFactory.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubConfigFactory.java
index 1d8e0ce..0f512ac 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubConfigFactory.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubConfigFactory.java
@@ -46,15 +46,15 @@ public class MockEventHubConfigFactory {
     mapConfig.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, SYSTEM_NAME), partitioningMethod.toString());
     mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, SYSTEM_NAME), STREAM_NAME1 + "," + STREAM_NAME2);
 
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_NAMESPACE);
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_ENTITY1);
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_KEY_NAME);
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_KEY);
-
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_NAMESPACE);
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_ENTITY2);
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_KEY_NAME);
-    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_KEY);
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, STREAM_NAME1), EVENTHUB_NAMESPACE);
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, STREAM_NAME1), EVENTHUB_ENTITY1);
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, STREAM_NAME1), EVENTHUB_KEY_NAME);
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, STREAM_NAME1), EVENTHUB_KEY);
+
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, STREAM_NAME2), EVENTHUB_NAMESPACE);
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, STREAM_NAME2), EVENTHUB_ENTITY2);
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, STREAM_NAME2), EVENTHUB_KEY_NAME);
+    mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, STREAM_NAME2), EVENTHUB_KEY);
 
     return new MapConfig(mapConfig);
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/272aa32e/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
index 865a248..4fced77 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
@@ -83,10 +83,10 @@ public class TestEventHubSystemConsumer {
     // Set configs
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), MOCK_ENTITY_1);
     MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData);
@@ -123,10 +123,10 @@ public class TestEventHubSystemConsumer {
     // Set configs
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), MOCK_ENTITY_1);
     MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData);
@@ -173,10 +173,10 @@ public class TestEventHubSystemConsumer {
     // Set configs
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), MOCK_ENTITY_1);
     MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData);
@@ -225,10 +225,10 @@ public class TestEventHubSystemConsumer {
     // Set configs
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), MOCK_ENTITY_1);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), MOCK_ENTITY_1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
     MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData);
@@ -282,14 +282,14 @@ public class TestEventHubSystemConsumer {
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName),
             String.format("%s,%s", streamName1, streamName2));
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName1), MOCK_ENTITY_1);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName1), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName1), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName1), EVENTHUB_KEY);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName2), MOCK_ENTITY_2);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName2), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName2), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName2), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName1), MOCK_ENTITY_1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName1), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName1), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName1), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName2), MOCK_ENTITY_2);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName2), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName2), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName2), EVENTHUB_KEY);
     MapConfig config = new MapConfig(configMap);
 
     MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData);

http://git-wip-us.apache.org/repos/asf/samza/blob/272aa32e/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
index ef73775..8572e95 100644
--- a/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
+++ b/samza-azure/src/test/java/org/apache/samza/system/eventhub/producer/TestEventHubSystemProducer.java
@@ -74,10 +74,10 @@ public class TestEventHubSystemProducer {
     // Set configs
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), EVENTHUB_ENTITY1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), EVENTHUB_ENTITY1);
     configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName),
             PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
     MapConfig config = new MapConfig(configMap);
@@ -125,10 +125,10 @@ public class TestEventHubSystemProducer {
     // Set configs
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), EVENTHUB_ENTITY1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), EVENTHUB_ENTITY1);
     configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName),
             PartitioningMethod.PARTITION_KEY_AS_PARTITION.toString());
     MapConfig config = new MapConfig(configMap);
@@ -182,10 +182,10 @@ public class TestEventHubSystemProducer {
     // Set configs
     Map<String, String> configMap = new HashMap<>();
     configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, systemName, streamName), EVENTHUB_NAMESPACE);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, systemName, streamName), EVENTHUB_KEY_NAME);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, systemName, streamName), EVENTHUB_KEY);
-    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, systemName, streamName), EVENTHUB_ENTITY1);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), EVENTHUB_ENTITY1);
 
     // mod 2 on the partitionid to simulate consistent hashing
     configMap.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, systemName),