You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:45 UTC
[29/50] [abbrv] samza git commit: SAMZA-816: avoid starting
coordinator consumer in LocalityManager in SamzaContainer
SAMZA-816: avoid starting coordinator consumer in LocalityManager in SamzaContainer
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/429f2458
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/429f2458
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/429f2458
Branch: refs/heads/samza-sql
Commit: 429f245839bd359c1375302fa488d8c96ca83a45
Parents: e8a2ef5
Author: Yi Pan <ni...@gmail.com>
Authored: Fri Nov 20 01:19:28 2015 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Fri Nov 20 01:19:28 2015 -0800
----------------------------------------------------------------------
.../apache/samza/container/LocalityManager.java | 43 +++++-
.../AbstractCoordinatorStreamManager.java | 12 +-
.../apache/samza/container/SamzaContainer.scala | 3 +-
.../samza/container/TestLocalityManager.java | 140 +++++++++++++++++++
.../MockCoordinatorStreamSystemFactory.java | 107 ++++++++++++++
5 files changed, 299 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/429f2458/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index d19b574..86c9e9b 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -37,11 +37,29 @@ import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
public class LocalityManager extends AbstractCoordinatorStreamManager {
private static final Logger log = LoggerFactory.getLogger(LocalityManager.class);
private Map<Integer, Map<String, String>> containerToHostMapping;
+ private final boolean writeOnly;
+ /**
+ * Default constructor that creates a read-write manager
+ *
+ * @param coordinatorStreamProducer producer to the coordinator stream
+ * @param coordinatorStreamConsumer consumer for the coordinator stream
+ */
public LocalityManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
super(coordinatorStreamProducer, coordinatorStreamConsumer, "SamzaContainer-");
this.containerToHostMapping = new HashMap<>();
+ this.writeOnly = coordinatorStreamConsumer == null;
+ }
+
+ /**
+ * Special constructor that creates a write-only {@link LocalityManager} that only writes
+ * to coordinator stream in {@link SamzaContainer}
+ *
+ * @param coordinatorStreamSystemProducer producer to the coordinator stream
+ */
+ public LocalityManager(CoordinatorStreamSystemProducer coordinatorStreamSystemProducer) {
+ this(coordinatorStreamSystemProducer, null);
}
/**
@@ -59,11 +77,23 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
* @param sourceSuffix the source suffix which is a container id
*/
public void register(String sourceSuffix) {
- registerCoordinatorStreamConsumer();
+ if (!this.writeOnly) {
+ registerCoordinatorStreamConsumer();
+ }
registerCoordinatorStreamProducer(getSource() + sourceSuffix);
}
+ /**
+ * Method to allow read container locality information from coordinator stream. This method is used
+ * in {@link org.apache.samza.coordinator.JobCoordinator}.
+ *
+ * @return the map of containerId: (hostname, jmxAddress, jmxTunnelAddress)
+ */
public Map<Integer, Map<String, String>> readContainerLocality() {
+ if (this.writeOnly) {
+ throw new UnsupportedOperationException("Read container locality function is not supported in write-only LocalityManager");
+ }
+
Map<Integer, Map<String, String>> allMappings = new HashMap<>();
for (CoordinatorStreamMessage message: getBootstrappedStream(SetContainerHostMapping.TYPE)) {
SetContainerHostMapping mapping = new SetContainerHostMapping(message);
@@ -78,6 +108,14 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
return allMappings;
}
+ /**
+ * Method to write locality info to coordinator stream. This method is used in {@link SamzaContainer}.
+ *
+ * @param containerId the {@link SamzaContainer} ID
+ * @param hostName the hostname
+ * @param jmxAddress the JMX URL address
+ * @param jmxTunnelingAddress the JMX Tunnel URL address
+ */
public void writeContainerToHostMapping(Integer containerId, String hostName, String jmxAddress, String jmxTunnelingAddress) {
Map<String, String> existingMappings = containerToHostMapping.get(containerId);
String existingHostMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.HOST_KEY) : null;
@@ -86,7 +124,8 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
} else {
log.info("Container {} started at {}", containerId, hostName);
}
- send(new SetContainerHostMapping(getSource() + containerId, String.valueOf(containerId), hostName, jmxAddress, jmxTunnelingAddress));
+ send(new SetContainerHostMapping(getSource() + containerId, String.valueOf(containerId), hostName, jmxAddress,
+ jmxTunnelingAddress));
Map<String, String> mappings = new HashMap<>();
mappings.put(SetContainerHostMapping.HOST_KEY, hostName);
mappings.put(SetContainerHostMapping.JMX_URL_KEY, jmxAddress);
http://git-wip-us.apache.org/repos/asf/samza/blob/429f2458/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
index ca97ce8..211b642 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
@@ -49,14 +49,18 @@ public abstract class AbstractCoordinatorStreamManager {
*/
public void start() {
coordinatorStreamProducer.start();
- coordinatorStreamConsumer.start();
+ if (coordinatorStreamConsumer != null) {
+ coordinatorStreamConsumer.start();
+ }
}
/**
* Stops the underlying coordinator stream producer and consumer.
*/
public void stop() {
- coordinatorStreamConsumer.stop();
+ if (coordinatorStreamConsumer != null) {
+ coordinatorStreamConsumer.stop();
+ }
coordinatorStreamProducer.stop();
}
@@ -74,6 +78,10 @@ public abstract class AbstractCoordinatorStreamManager {
* @return a set of {@link CoordinatorStreamMessage} if messages exists for the given source, else an empty set
*/
public Set<CoordinatorStreamMessage> getBootstrappedStream(String source) {
+ if (coordinatorStreamConsumer == null) {
+ throw new UnsupportedOperationException(String.format("CoordinatorStreamConsumer is not initialized in the AbstractCoordinatorStreamManager. "
+ + "manager registered source: %s, input source: %s", this.source, source));
+ }
return coordinatorStreamConsumer.getBootstrappedStream(source);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/429f2458/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index db6074b..ddce148 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -304,9 +304,8 @@ object SamzaContainer extends Logging {
info("Got metrics reporters: %s" format reporters.keys)
- val coordinatorSystemConsumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, samzaContainerMetrics.registry)
val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry)
- val localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer)
+ val localityManager = new LocalityManager(coordinatorSystemProducer)
val checkpointManager = config.getCheckpointManagerFactory() match {
case Some(checkpointFactoryClassName) if (!checkpointFactoryClassName.isEmpty) =>
Util
http://git-wip-us.apache.org/repos/asf/samza/blob/429f2458/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
new file mode 100644
index 0000000..9661885
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.container;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Unit tests for {@link LocalityManager}
+ */
+public class TestLocalityManager {
+
+ private final MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory =
+ new MockCoordinatorStreamSystemFactory();
+ private final Config config = new MapConfig(
+ new HashMap<String, String>() {
+ {
+ this.put("job.name", "test-job");
+ this.put("job.coordinator.system", "test-kafka");
+ }
+ });
+
+ @Before
+ public void setup() {
+ MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
+ }
+
+ @After
+ public void tearDown() {
+ MockCoordinatorStreamSystemFactory.disableMockConsumerCache();
+ }
+
+ @Test public void testLocalityManager() throws Exception {
+ MockCoordinatorStreamSystemProducer producer =
+ mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
+ MockCoordinatorStreamSystemConsumer consumer =
+ mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
+ LocalityManager localityManager = new LocalityManager(producer, consumer);
+
+ try {
+ localityManager.register(new TaskName("task-0"));
+ fail("Should have thrown UnsupportedOperationException");
+ } catch (UnsupportedOperationException uoe) {
+ // expected
+ }
+
+ localityManager.register("containerId-0");
+ assertTrue(producer.isRegistered());
+ assertEquals(producer.getRegisteredSource(), "SamzaContainer-containerId-0");
+ assertTrue(consumer.isRegistered());
+
+ localityManager.start();
+ assertTrue(producer.isStarted());
+ assertTrue(consumer.isStarted());
+
+ localityManager.writeContainerToHostMapping(0, "localhost", "jmx:localhost:8080", "jmx:tunnel:localhost:9090");
+ Map<Integer, Map<String, String>> localMap = localityManager.readContainerLocality();
+ Map<Integer, Map<String, String>> expectedMap =
+ new HashMap<Integer, Map<String, String>>() {
+ {
+ this.put(new Integer(0),
+ new HashMap<String, String>() {
+ {
+ this.put(SetContainerHostMapping.HOST_KEY, "localhost");
+ this.put(SetContainerHostMapping.JMX_URL_KEY, "jmx:localhost:8080");
+ this.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, "jmx:tunnel:localhost:9090");
+ }
+ });
+ }
+ };
+ assertEquals(expectedMap, localMap);
+
+ localityManager.stop();
+ assertTrue(producer.isStopped());
+ assertTrue(consumer.isStopped());
+ }
+
+ @Test public void testWriteOnlyLocalityManager() {
+ MockCoordinatorStreamSystemProducer producer =
+ mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
+ LocalityManager localityManager = new LocalityManager(producer);
+
+ localityManager.register("containerId-1");
+ assertTrue(producer.isRegistered());
+ assertEquals(producer.getRegisteredSource(), "SamzaContainer-containerId-1");
+
+ localityManager.start();
+ assertTrue(producer.isStarted());
+
+ localityManager.writeContainerToHostMapping(1, "localhost", "jmx:localhost:8181", "jmx:tunnel:localhost:9191");
+ try {
+ localityManager.readContainerLocality();
+ fail("Should have thrown UnsupportedOperationException");
+ } catch (UnsupportedOperationException uoe) {
+ // expected
+ }
+ assertEquals(producer.getEnvelopes().size(), 1);
+ CoordinatorStreamMessage coordinatorStreamMessage =
+ MockCoordinatorStreamSystemFactory.deserializeCoordinatorStreamMessage(producer.getEnvelopes().get(0));
+
+ SetContainerHostMapping expectedContainerMap =
+ new SetContainerHostMapping("SamzaContainer-1", "1", "localhost", "jmx:localhost:8181",
+ "jmx:tunnel:localhost:9191");
+ assertEquals(expectedContainerMap, coordinatorStreamMessage);
+
+ localityManager.stop();
+ assertTrue(producer.isStopped());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/429f2458/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
index 9d8c98e..e0d4aa1 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -22,10 +22,13 @@ package org.apache.samza.coordinator.stream;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -36,6 +39,9 @@ import org.apache.samza.util.Util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
/**
@@ -59,6 +65,14 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
mockConsumer = null;
}
+ public static CoordinatorStreamMessage deserializeCoordinatorStreamMessage(OutgoingMessageEnvelope msg) {
+ JsonSerde<List<?>> keySerde = new JsonSerde<>();
+ Object[] keyArray = keySerde.fromBytes((byte[]) msg.getKey()).toArray();
+ JsonSerde<Map<String, Object>> msgSerde = new JsonSerde<>();
+ Map<String, Object> valueMap = msgSerde.fromBytes((byte[]) msg.getMessage());
+ return new CoordinatorStreamMessage(keyArray, valueMap);
+ }
+
/**
* Returns a consumer that sends all configs to the coordinator stream.
*
@@ -87,6 +101,13 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
return mockConsumer;
}
+ private SystemStream getCoordinatorSystemStream(Config config) {
+ assertNotNull(config.get("job.coordinator.system"));
+ assertNotNull(config.get("job.name"));
+ return new SystemStream(config.get("job.coordinator.system"), Util.getCoordinatorStreamName(config.get("job.name"),
+ config.get("job.id") == null ? "1" : config.get("job.id")));
+ }
+
/**
* Returns a MockCoordinatorSystemProducer.
*/
@@ -94,6 +115,18 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
return new MockSystemProducer(null);
}
+ public MockCoordinatorStreamSystemConsumer getCoordinatorStreamSystemConsumer(Config config, MetricsRegistry registry) {
+ return new MockCoordinatorStreamSystemConsumer(getCoordinatorSystemStream(config),
+ getConsumer(config.get("job.coordinator.system"), config, registry),
+ getAdmin(config.get("job.coordinator.system"), config));
+ }
+
+ public MockCoordinatorStreamSystemProducer getCoordinatorStreamSystemProducer(Config config, MetricsRegistry registry) {
+ return new MockCoordinatorStreamSystemProducer(getCoordinatorSystemStream(config),
+ getProducer(config.get("job.coordinator.system"), config, registry),
+ getAdmin(config.get("job.coordinator.system"), config));
+ }
+
/**
* Returns a single partition admin that pretends to create a coordinator
* stream.
@@ -102,6 +135,74 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
return new MockSystemAdmin();
}
+ public static final class MockCoordinatorStreamSystemConsumer extends CoordinatorStreamSystemConsumer {
+ private final MockCoordinatorStreamWrappedConsumer consumer;
+ private boolean isRegistered = false;
+ private boolean isStarted = false;
+
+ public MockCoordinatorStreamSystemConsumer(SystemStream stream, SystemConsumer consumer, SystemAdmin admin) {
+ super(stream, consumer, admin);
+ this.consumer = (MockCoordinatorStreamWrappedConsumer) consumer;
+ }
+
+ public MockCoordinatorStreamWrappedConsumer getConsumer() {
+ return this.consumer;
+ }
+
+ public void register() {
+ isRegistered = true;
+ }
+
+ public void start() {
+ isStarted = true;
+ }
+
+ public void stop() {
+ isStarted = false;
+ }
+
+ public boolean isRegistered() {
+ return isRegistered;
+ }
+
+ public boolean isStarted() {
+ return isStarted;
+ }
+
+ public boolean isStopped() {
+ return !isStarted;
+ }
+ }
+
+ public static final class MockCoordinatorStreamSystemProducer extends CoordinatorStreamSystemProducer {
+ private final MockSystemProducer producer;
+
+ public MockCoordinatorStreamSystemProducer(SystemStream stream, SystemProducer producer, SystemAdmin admin) {
+ super(stream, producer, admin);
+ this.producer = (MockSystemProducer) producer;
+ }
+
+ public boolean isRegistered() {
+ return this.producer.isRegistered();
+ }
+
+ public String getRegisteredSource() {
+ return this.producer.getRegisteredSource();
+ }
+
+ public boolean isStarted() {
+ return this.producer.isStarted();
+ }
+
+ public boolean isStopped() {
+ return this.producer.isStopped();
+ }
+
+ public List<OutgoingMessageEnvelope> getEnvelopes() {
+ return this.producer.getEnvelopes();
+ }
+ }
+
public static final class MockSystemAdmin extends SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
public void createCoordinatorStream(String streamName) {
// Do nothing.
@@ -113,6 +214,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
private final List<OutgoingMessageEnvelope> envelopes;
private boolean started = false;
private boolean registered = false;
+ private String registeredSource = null;
private boolean flushed = false;
public MockSystemProducer(String expectedSource) {
@@ -131,6 +233,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
public void register(String source) {
registered = true;
+ registeredSource = source;
}
public void send(String source, OutgoingMessageEnvelope envelope) {
@@ -175,5 +278,9 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
public String getExpectedSource() {
return expectedSource;
}
+
+ public String getRegisteredSource() {
+ return registeredSource;
+ }
}
}