You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:45 UTC

[29/50] [abbrv] samza git commit: SAMZA-816: avoid starting coordinator consumer in LocalityManager in SamzaContainer

SAMZA-816: avoid starting coordinator consumer in LocalityManager in SamzaContainer


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/429f2458
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/429f2458
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/429f2458

Branch: refs/heads/samza-sql
Commit: 429f245839bd359c1375302fa488d8c96ca83a45
Parents: e8a2ef5
Author: Yi Pan <ni...@gmail.com>
Authored: Fri Nov 20 01:19:28 2015 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Nov 20 01:19:28 2015 -0800

----------------------------------------------------------------------
 .../apache/samza/container/LocalityManager.java |  43 +++++-
 .../AbstractCoordinatorStreamManager.java       |  12 +-
 .../apache/samza/container/SamzaContainer.scala |   3 +-
 .../samza/container/TestLocalityManager.java    | 140 +++++++++++++++++++
 .../MockCoordinatorStreamSystemFactory.java     | 107 ++++++++++++++
 5 files changed, 299 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/429f2458/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index d19b574..86c9e9b 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -37,11 +37,29 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 public class LocalityManager extends AbstractCoordinatorStreamManager {
   private static final Logger log = LoggerFactory.getLogger(LocalityManager.class);
   private Map<Integer, Map<String, String>> containerToHostMapping;
+  private final boolean writeOnly;
 
+  /**
+   * Default constructor that creates a read-write manager
+   *
+   * @param coordinatorStreamProducer producer to the coordinator stream
+   * @param coordinatorStreamConsumer consumer for the coordinator stream
+   */
   public LocalityManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
                          CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
     super(coordinatorStreamProducer, coordinatorStreamConsumer, "SamzaContainer-");
     this.containerToHostMapping = new HashMap<>();
+    this.writeOnly = coordinatorStreamConsumer == null;
+  }
+
+  /**
+   * Special constructor that creates a write-only {@link LocalityManager} that only writes
+   * to coordinator stream in {@link SamzaContainer}
+   *
+   * @param coordinatorStreamSystemProducer producer to the coordinator stream
+   */
+  public LocalityManager(CoordinatorStreamSystemProducer coordinatorStreamSystemProducer) {
+    this(coordinatorStreamSystemProducer, null);
   }
 
   /**
@@ -59,11 +77,23 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
    * @param sourceSuffix the source suffix which is a container id
    */
   public void register(String sourceSuffix) {
-    registerCoordinatorStreamConsumer();
+    if (!this.writeOnly) {
+      registerCoordinatorStreamConsumer();
+    }
     registerCoordinatorStreamProducer(getSource() + sourceSuffix);
   }
 
+  /**
+   * Method to allow read container locality information from coordinator stream. This method is used
+   * in {@link org.apache.samza.coordinator.JobCoordinator}.
+   *
+   * @return the map of containerId: (hostname, jmxAddress, jmxTunnelAddress)
+   */
   public Map<Integer, Map<String, String>> readContainerLocality() {
+    if (this.writeOnly) {
+      throw new UnsupportedOperationException("Read container locality function is not supported in write-only LocalityManager");
+    }
+
     Map<Integer, Map<String, String>> allMappings = new HashMap<>();
     for (CoordinatorStreamMessage message: getBootstrappedStream(SetContainerHostMapping.TYPE)) {
       SetContainerHostMapping mapping = new SetContainerHostMapping(message);
@@ -78,6 +108,14 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
     return allMappings;
   }
 
+  /**
+   * Method to write locality info to coordinator stream. This method is used in {@link SamzaContainer}.
+   *
+   * @param containerId  the {@link SamzaContainer} ID
+   * @param hostName  the hostname
+   * @param jmxAddress  the JMX URL address
+   * @param jmxTunnelingAddress  the JMX Tunnel URL address
+   */
   public void writeContainerToHostMapping(Integer containerId, String hostName, String jmxAddress, String jmxTunnelingAddress) {
     Map<String, String> existingMappings = containerToHostMapping.get(containerId);
     String existingHostMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.HOST_KEY) : null;
@@ -86,7 +124,8 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
     } else {
       log.info("Container {} started at {}", containerId, hostName);
     }
-    send(new SetContainerHostMapping(getSource() + containerId, String.valueOf(containerId), hostName, jmxAddress, jmxTunnelingAddress));
+    send(new SetContainerHostMapping(getSource() + containerId, String.valueOf(containerId), hostName, jmxAddress,
+        jmxTunnelingAddress));
     Map<String, String> mappings = new HashMap<>();
     mappings.put(SetContainerHostMapping.HOST_KEY, hostName);
     mappings.put(SetContainerHostMapping.JMX_URL_KEY, jmxAddress);

http://git-wip-us.apache.org/repos/asf/samza/blob/429f2458/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
index ca97ce8..211b642 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
@@ -49,14 +49,18 @@ public abstract class AbstractCoordinatorStreamManager {
    */
   public void start() {
     coordinatorStreamProducer.start();
-    coordinatorStreamConsumer.start();
+    if (coordinatorStreamConsumer != null) {
+      coordinatorStreamConsumer.start();
+    }
   }
 
   /**
    * Stops the underlying coordinator stream producer and consumer.
    */
   public void stop() {
-    coordinatorStreamConsumer.stop();
+    if (coordinatorStreamConsumer != null) {
+      coordinatorStreamConsumer.stop();
+    }
     coordinatorStreamProducer.stop();
   }
 
@@ -74,6 +78,10 @@ public abstract class AbstractCoordinatorStreamManager {
    * @return a set of {@link CoordinatorStreamMessage} if messages exists for the given source, else an empty set
    */
   public Set<CoordinatorStreamMessage> getBootstrappedStream(String source) {
+    if (coordinatorStreamConsumer == null) {
+      throw new UnsupportedOperationException(String.format("CoordinatorStreamConsumer is not initialized in the AbstractCoordinatorStreamManager. "
+          + "manager registered source: %s, input source: %s", this.source, source));
+    }
     return coordinatorStreamConsumer.getBootstrappedStream(source);
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/429f2458/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index db6074b..ddce148 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -304,9 +304,8 @@ object SamzaContainer extends Logging {
 
     info("Got metrics reporters: %s" format reporters.keys)
 
-    val coordinatorSystemConsumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, samzaContainerMetrics.registry)
     val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry)
-    val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
+    val localityManager = new LocalityManager(coordinatorSystemProducer)
     val checkpointManager = config.getCheckpointManagerFactory() match {
       case Some(checkpointFactoryClassName) if (!checkpointFactoryClassName.isEmpty) =>
         Util

http://git-wip-us.apache.org/repos/asf/samza/blob/429f2458/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
new file mode 100644
index 0000000..9661885
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Unit tests for {@link LocalityManager}
+ */
+public class TestLocalityManager {
+
+  private final MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory =
+      new MockCoordinatorStreamSystemFactory();
+  private final Config config = new MapConfig(
+      new HashMap<String, String>() {
+        {
+          this.put("job.name", "test-job");
+          this.put("job.coordinator.system", "test-kafka");
+        }
+      });
+
+  @Before
+  public void setup() {
+    MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
+  }
+
+  @After
+  public void tearDown() {
+    MockCoordinatorStreamSystemFactory.disableMockConsumerCache();
+  }
+
+  @Test public void testLocalityManager() throws Exception {
+    MockCoordinatorStreamSystemProducer producer =
+        mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
+    MockCoordinatorStreamSystemConsumer consumer =
+        mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
+    LocalityManager localityManager = new LocalityManager(producer, consumer);
+
+    try {
+      localityManager.register(new TaskName("task-0"));
+      fail("Should have thrown UnsupportedOperationException");
+    } catch (UnsupportedOperationException uoe) {
+      // expected
+    }
+
+    localityManager.register("containerId-0");
+    assertTrue(producer.isRegistered());
+    assertEquals(producer.getRegisteredSource(), "SamzaContainer-containerId-0");
+    assertTrue(consumer.isRegistered());
+
+    localityManager.start();
+    assertTrue(producer.isStarted());
+    assertTrue(consumer.isStarted());
+
+    localityManager.writeContainerToHostMapping(0, "localhost", "jmx:localhost:8080", "jmx:tunnel:localhost:9090");
+    Map<Integer, Map<String, String>> localMap = localityManager.readContainerLocality();
+    Map<Integer, Map<String, String>> expectedMap =
+        new HashMap<Integer, Map<String, String>>() {
+          {
+            this.put(new Integer(0),
+                new HashMap<String, String>() {
+                  {
+                    this.put(SetContainerHostMapping.HOST_KEY, "localhost");
+                    this.put(SetContainerHostMapping.JMX_URL_KEY, "jmx:localhost:8080");
+                    this.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, "jmx:tunnel:localhost:9090");
+                  }
+                });
+          }
+        };
+    assertEquals(expectedMap, localMap);
+
+    localityManager.stop();
+    assertTrue(producer.isStopped());
+    assertTrue(consumer.isStopped());
+  }
+
+  @Test public void testWriteOnlyLocalityManager() {
+    MockCoordinatorStreamSystemProducer producer =
+        mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
+    LocalityManager localityManager = new LocalityManager(producer);
+
+    localityManager.register("containerId-1");
+    assertTrue(producer.isRegistered());
+    assertEquals(producer.getRegisteredSource(), "SamzaContainer-containerId-1");
+
+    localityManager.start();
+    assertTrue(producer.isStarted());
+
+    localityManager.writeContainerToHostMapping(1, "localhost", "jmx:localhost:8181", "jmx:tunnel:localhost:9191");
+    try {
+      localityManager.readContainerLocality();
+      fail("Should have thrown UnsupportedOperationException");
+    } catch (UnsupportedOperationException uoe) {
+      // expected
+    }
+    assertEquals(producer.getEnvelopes().size(), 1);
+    CoordinatorStreamMessage coordinatorStreamMessage =
+        MockCoordinatorStreamSystemFactory.deserializeCoordinatorStreamMessage(producer.getEnvelopes().get(0));
+
+    SetContainerHostMapping expectedContainerMap =
+        new SetContainerHostMapping("SamzaContainer-1", "1", "localhost", "jmx:localhost:8181",
+            "jmx:tunnel:localhost:9191");
+    assertEquals(expectedContainerMap, coordinatorStreamMessage);
+
+    localityManager.stop();
+    assertTrue(producer.isStopped());
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/429f2458/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
index 9d8c98e..e0d4aa1 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -22,10 +22,13 @@ package org.apache.samza.coordinator.stream;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -36,6 +39,9 @@ import org.apache.samza.util.Util;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
 
 
 /**
@@ -59,6 +65,14 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
     mockConsumer = null;
   }
 
+  public static CoordinatorStreamMessage deserializeCoordinatorStreamMessage(OutgoingMessageEnvelope msg) {
+    JsonSerde<List<?>> keySerde = new JsonSerde<>();
+    Object[] keyArray = keySerde.fromBytes((byte[]) msg.getKey()).toArray();
+    JsonSerde<Map<String, Object>> msgSerde = new JsonSerde<>();
+    Map<String, Object> valueMap = msgSerde.fromBytes((byte[]) msg.getMessage());
+    return new CoordinatorStreamMessage(keyArray, valueMap);
+  }
+
   /**
    * Returns a consumer that sends all configs to the coordinator stream.
    *
@@ -87,6 +101,13 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
     return mockConsumer;
   }
 
+  private SystemStream getCoordinatorSystemStream(Config config) {
+    assertNotNull(config.get("job.coordinator.system"));
+    assertNotNull(config.get("job.name"));
+    return new SystemStream(config.get("job.coordinator.system"), Util.getCoordinatorStreamName(config.get("job.name"),
+        config.get("job.id") == null ? "1" : config.get("job.id")));
+  }
+
   /**
    * Returns a MockCoordinatorSystemProducer.
    */
@@ -94,6 +115,18 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
     return new MockSystemProducer(null);
   }
 
+  public MockCoordinatorStreamSystemConsumer getCoordinatorStreamSystemConsumer(Config config, MetricsRegistry registry) {
+    return new MockCoordinatorStreamSystemConsumer(getCoordinatorSystemStream(config),
+        getConsumer(config.get("job.coordinator.system"), config, registry),
+        getAdmin(config.get("job.coordinator.system"), config));
+  }
+
+  public MockCoordinatorStreamSystemProducer getCoordinatorStreamSystemProducer(Config config, MetricsRegistry registry) {
+    return new MockCoordinatorStreamSystemProducer(getCoordinatorSystemStream(config),
+        getProducer(config.get("job.coordinator.system"), config, registry),
+        getAdmin(config.get("job.coordinator.system"), config));
+  }
+
   /**
    * Returns a single partition admin that pretends to create a coordinator
    * stream.
@@ -102,6 +135,74 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
     return new MockSystemAdmin();
   }
 
+  public static final class MockCoordinatorStreamSystemConsumer extends CoordinatorStreamSystemConsumer {
+    private final MockCoordinatorStreamWrappedConsumer consumer;
+    private boolean isRegistered = false;
+    private boolean isStarted = false;
+
+    public MockCoordinatorStreamSystemConsumer(SystemStream stream, SystemConsumer consumer, SystemAdmin admin) {
+      super(stream, consumer, admin);
+      this.consumer = (MockCoordinatorStreamWrappedConsumer) consumer;
+    }
+
+    public MockCoordinatorStreamWrappedConsumer getConsumer() {
+      return this.consumer;
+    }
+
+    public void register() {
+      isRegistered = true;
+    }
+
+    public void start() {
+      isStarted = true;
+    }
+
+    public void stop() {
+      isStarted = false;
+    }
+
+    public boolean isRegistered() {
+      return isRegistered;
+    }
+
+    public boolean isStarted() {
+      return isStarted;
+    }
+
+    public boolean isStopped() {
+      return !isStarted;
+    }
+  }
+
+  public static final class MockCoordinatorStreamSystemProducer extends CoordinatorStreamSystemProducer {
+    private final MockSystemProducer producer;
+
+    public MockCoordinatorStreamSystemProducer(SystemStream stream, SystemProducer producer, SystemAdmin admin) {
+      super(stream, producer, admin);
+      this.producer = (MockSystemProducer) producer;
+    }
+
+    public boolean isRegistered() {
+      return this.producer.isRegistered();
+    }
+
+    public String getRegisteredSource() {
+      return this.producer.getRegisteredSource();
+    }
+
+    public boolean isStarted() {
+      return this.producer.isStarted();
+    }
+
+    public boolean isStopped() {
+      return this.producer.isStopped();
+    }
+
+    public List<OutgoingMessageEnvelope> getEnvelopes() {
+      return this.producer.getEnvelopes();
+    }
+  }
+
   public static final class MockSystemAdmin extends SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
     public void createCoordinatorStream(String streamName) {
       // Do nothing.
@@ -113,6 +214,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
     private final List<OutgoingMessageEnvelope> envelopes;
     private boolean started = false;
     private boolean registered = false;
+    private String registeredSource = null;
     private boolean flushed = false;
 
     public MockSystemProducer(String expectedSource) {
@@ -131,6 +233,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
 
     public void register(String source) {
       registered = true;
+      registeredSource = source;
     }
 
     public void send(String source, OutgoingMessageEnvelope envelope) {
@@ -175,5 +278,9 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
     public String getExpectedSource() {
       return expectedSource;
     }
+
+    public String getRegisteredSource() {
+      return registeredSource;
+    }
   }
 }