You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2015/11/02 23:06:39 UTC
[1/2] incubator-apex-core git commit: Fixed style violations in
StreamingContainer.java to address APEX-208
Repository: incubator-apex-core
Updated Branches:
refs/heads/devel-3 b4b7c0717 -> b1af72403
Fixed style violations in StreamingContainer.java to address APEX-208
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b0aab1ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b0aab1ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b0aab1ad
Branch: refs/heads/devel-3
Commit: b0aab1adf7f00f170c954ea5cd7d731a9fe8dfa3
Parents: af7179c
Author: Ilya Ganelin <il...@capitalone.com>
Authored: Sun Nov 1 09:32:52 2015 -0800
Committer: Ilya Ganelin <il...@capitalone.com>
Committed: Sun Nov 1 09:32:52 2015 -0800
----------------------------------------------------------------------
.../stram/engine/StreamingContainer.java | 248 +++++++++----------
1 file changed, 118 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0aab1ad/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index a17dfdf..1544c16 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -18,7 +18,6 @@
*/
package com.datatorrent.stram.engine;
-import java.io.File;
import java.io.IOException;
import java.lang.Thread.State;
import java.lang.management.GarbageCollectorMXBean;
@@ -28,19 +27,24 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.AbstractMap.SimpleEntry;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import net.engio.mbassy.bus.MBassador;
-import net.engio.mbassy.bus.config.BusConfiguration;
-
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -52,14 +56,21 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.log4j.DTLoggerFactory;
import org.apache.log4j.LogManager;
-import com.datatorrent.api.*;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Operator.OutputPort;
import com.datatorrent.api.Operator.ProcessingMode;
+import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StatsListener.OperatorRequest;
+import com.datatorrent.api.StorageAgent;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.StringCodec;
import com.datatorrent.api.annotation.Stateless;
-
import com.datatorrent.bufferserver.server.Server;
import com.datatorrent.bufferserver.storage.DiskStorage;
import com.datatorrent.bufferserver.util.Codec;
@@ -70,19 +81,45 @@ import com.datatorrent.stram.ComponentContextPair;
import com.datatorrent.stram.RecoverableRpcProxy;
import com.datatorrent.stram.StramUtils.YarnContainerMain;
import com.datatorrent.stram.StringCodecs;
-import com.datatorrent.stram.api.*;
-import com.datatorrent.stram.api.ContainerEvent.*;
+import com.datatorrent.stram.api.Checkpoint;
+import com.datatorrent.stram.api.ContainerContext;
+import com.datatorrent.stram.api.ContainerEvent;
+import com.datatorrent.stram.api.ContainerEvent.ContainerStatsEvent;
+import com.datatorrent.stram.api.ContainerEvent.NodeActivationEvent;
+import com.datatorrent.stram.api.ContainerEvent.NodeDeactivationEvent;
+import com.datatorrent.stram.api.ContainerEvent.StreamActivationEvent;
+import com.datatorrent.stram.api.ContainerEvent.StreamDeactivationEvent;
+import com.datatorrent.stram.api.OperatorDeployInfo;
import com.datatorrent.stram.api.OperatorDeployInfo.OperatorType;
import com.datatorrent.stram.api.OperatorDeployInfo.UnifierDeployInfo;
-import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.*;
+import com.datatorrent.stram.api.RequestFactory;
+import com.datatorrent.stram.api.StramToNodeChangeLoggersRequest;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeat;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StramToNodeRequest;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StreamingContainerContext;
import com.datatorrent.stram.debug.StdOutErrLog;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
import com.datatorrent.stram.plan.logical.Operators.PortMappingDescriptor;
import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
import com.datatorrent.stram.security.StramUserLogin;
-import com.datatorrent.stram.stream.*;
+import com.datatorrent.stram.stream.BufferServerPublisher;
+import com.datatorrent.stram.stream.BufferServerSubscriber;
+import com.datatorrent.stram.stream.FastPublisher;
+import com.datatorrent.stram.stream.FastSubscriber;
+import com.datatorrent.stram.stream.InlineStream;
+import com.datatorrent.stram.stream.MuxStream;
+import com.datatorrent.stram.stream.OiOStream;
+import com.datatorrent.stram.stream.PartitionAwareSink;
+import com.datatorrent.stram.stream.PartitionAwareSinkForPersistence;
+
+import net.engio.mbassy.bus.MBassador;
+import net.engio.mbassy.bus.config.BusConfiguration;
/**
* Object which controls the container process launched by {@link com.datatorrent.stram.StreamingAppMaster}.
@@ -131,8 +168,7 @@ public class StreamingContainer extends YarnContainerMain
static {
try {
eventloop = DefaultEventLoop.createEventLoop("ProcessWideEventLoop");
- }
- catch (IOException io) {
+ } catch (IOException io) {
throw new RuntimeException(io);
}
}
@@ -183,8 +219,7 @@ public class StreamingContainer extends YarnContainerMain
if (blocksize < 1) {
blocksize = 1;
}
- }
- else {
+ } else {
blocksize = 64;
blockCount = bufferServerRAM / blocksize;
}
@@ -196,10 +231,9 @@ public class StreamingContainer extends YarnContainerMain
}
SocketAddress bindAddr = bufferServer.run(eventloop);
logger.debug("Buffer server started: {}", bindAddr);
- this.bufferServerAddress = NetUtils.getConnectAddress(((InetSocketAddress) bindAddr));
+ this.bufferServerAddress = NetUtils.getConnectAddress(((InetSocketAddress)bindAddr));
}
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
logger.warn("deploy request failed due to {}", ex);
throw new IllegalStateException("Failed to deploy buffer server", ex);
}
@@ -210,15 +244,13 @@ public class StreamingContainer extends YarnContainerMain
singletons.put(clazz.getName(), newInstance);
if (newInstance instanceof Component) {
- components.add((Component<ContainerContext>) newInstance);
+ components.add((Component<ContainerContext>)newInstance);
}
eventBus.subscribe(newInstance);
- }
- catch (InstantiationException ex) {
+ } catch (InstantiationException ex) {
logger.warn("Container Event Listener Instantiation", ex);
- }
- catch (IllegalAccessException ex) {
+ } catch (IllegalAccessException ex) {
logger.warn("Container Event Listener Instantiation", ex);
}
}
@@ -279,24 +311,20 @@ public class StreamingContainer extends YarnContainerMain
/* main thread enters heartbeat loop */
stramChild.heartbeatLoop();
exitStatus = 0;
- }
- finally {
+ } finally {
stramChild.teardown();
}
- }
- catch (Error error) {
+ } catch (Error error) {
logger.error("Fatal error in container!", error);
/* Report back any failures, for diagnostic purposes */
String msg = ExceptionUtils.getStackTrace(error);
umbilical.reportError(childId, null, "FATAL: " + msg);
- }
- catch (Exception exception) {
+ } catch (Exception exception) {
logger.error("Fatal exception in container!", exception);
/* Report back any failures, for diagnostic purposes */
String msg = ExceptionUtils.getStackTrace(exception);
umbilical.reportError(childId, null, msg);
- }
- finally {
+ } finally {
rpcProxy.close();
DefaultMetricsSystem.shutdown();
logger.info("Exit status for container: {}", exitStatus);
@@ -317,8 +345,7 @@ public class StreamingContainer extends YarnContainerMain
Thread t = e.getValue().context.getThread();
if (t == null || !t.isAlive()) {
disconnectNode(e.getKey());
- }
- else {
+ } else {
activeThreads.add(t);
activeOperators.add(e.getKey());
e.getValue().shutdown();
@@ -334,8 +361,7 @@ public class StreamingContainer extends YarnContainerMain
}
disconnectNode(iterator.next());
}
- }
- catch (InterruptedException ex) {
+ } catch (InterruptedException ex) {
logger.warn("Aborting wait for operators to get deactivated!", ex);
}
@@ -370,15 +396,13 @@ public class StreamingContainer extends YarnContainerMain
String sinks = pair.context.getSinkId();
if (sinks == null) {
logger.error("mux sinks found connected at {} with sink id null", sourceIdentifier);
- }
- else {
+ } else {
String[] split = sinks.split(MuxStream.MULTI_SINK_ID_CONCAT_SEPARATOR);
for (int i = split.length; i-- > 0; ) {
ComponentContextPair<Stream, StreamContext> spair = streams.remove(split[i]);
if (spair == null) {
logger.error("mux is missing the stream for sink {}", split[i]);
- }
- else {
+ } else {
if (activeStreams.remove(spair.component) != null) {
spair.component.deactivate();
eventBus.publish(new StreamDeactivationEvent(spair));
@@ -388,8 +412,7 @@ public class StreamingContainer extends YarnContainerMain
}
}
}
- }
- else {
+ } else {
// it's either inline stream or it's bufferserver publisher.
}
@@ -417,8 +440,7 @@ public class StreamingContainer extends YarnContainerMain
if (sourcePair == pair) {
/* for some reason we had the stream stored against both source and sink identifiers */
streams.remove(pair.context.getSourceId());
- }
- else {
+ } else {
/* the stream was one of the many streams sourced by a muxstream */
unregisterSinkFromMux(sourcePair, sinkIdentifier);
}
@@ -440,7 +462,7 @@ public class StreamingContainer extends YarnContainerMain
}
if (found) {
- ((Stream.MultiSinkCapableStream) muxpair.component).setSink(sinkIdentifier, null);
+ ((Stream.MultiSinkCapableStream)muxpair.component).setSink(sinkIdentifier, null);
if (sinks.length == 1) {
muxpair.context.setSinkId(null);
@@ -450,8 +472,7 @@ public class StreamingContainer extends YarnContainerMain
eventBus.publish(new StreamDeactivationEvent(muxpair));
}
muxpair.component.teardown();
- }
- else {
+ } else {
StringBuilder builder = new StringBuilder(muxpair.context.getSinkId().length() - MuxStream.MULTI_SINK_ID_CONCAT_SEPARATOR.length() - sinkIdentifier.length());
found = false;
@@ -459,8 +480,7 @@ public class StreamingContainer extends YarnContainerMain
if (sinks[i] != null) {
if (found) {
builder.append(MuxStream.MULTI_SINK_ID_CONCAT_SEPARATOR).append(sinks[i]);
- }
- else {
+ } else {
builder.append(sinks[i]);
found = true;
}
@@ -469,8 +489,7 @@ public class StreamingContainer extends YarnContainerMain
muxpair.context.setSinkId(builder.toString());
}
- }
- else {
+ } else {
logger.error("{} was not connected to stream connected to {}", sinkIdentifier, muxpair.context.getSourceId());
}
@@ -509,11 +528,9 @@ public class StreamingContainer extends YarnContainerMain
Node<?> node = nodes.get(operatorId);
if (node == null) {
throw new IllegalArgumentException("Node " + operatorId + " is not hosted in this container!");
- }
- else if (toUndeploy.containsKey(operatorId)) {
+ } else if (toUndeploy.containsKey(operatorId)) {
throw new IllegalArgumentException("Node " + operatorId + " is requested to be undeployed more than once");
- }
- else {
+ } else {
toUndeploy.put(operatorId, node);
}
}
@@ -524,8 +541,7 @@ public class StreamingContainer extends YarnContainerMain
Thread t = nodes.get(operatorId).context.getThread();
if (t == null || !t.isAlive()) {
disconnectNode(operatorId);
- }
- else {
+ } else {
joinList.add(t);
discoList.add(operatorId);
nodes.get(operatorId).shutdown();
@@ -542,8 +558,7 @@ public class StreamingContainer extends YarnContainerMain
disconnectNode(iterator.next());
}
logger.info("Undeploy complete.");
- }
- catch (InterruptedException ex) {
+ } catch (InterruptedException ex) {
logger.warn("Aborting wait for operators to get deactivated!", ex);
}
@@ -608,8 +623,7 @@ public class StreamingContainer extends YarnContainerMain
synchronized (this.heartbeatTrigger) {
try {
this.heartbeatTrigger.wait(heartbeatIntervalMillis);
- }
- catch (InterruptedException e1) {
+ } catch (InterruptedException e1) {
logger.warn("Interrupted in heartbeat loop, exiting..");
break;
}
@@ -626,7 +640,7 @@ public class StreamingContainer extends YarnContainerMain
msg.restartRequested = true;
}
}
- msg.memoryMBFree = ((int) (Runtime.getRuntime().freeMemory() / (1024 * 1024)));
+ msg.memoryMBFree = ((int)(Runtime.getRuntime().freeMemory() / (1024 * 1024)));
garbageCollectorMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
for (GarbageCollectorMXBean bean : garbageCollectorMXBeans) {
msg.gcCollectionTime += bean.getCollectionTime();
@@ -655,11 +669,9 @@ public class StreamingContainer extends YarnContainerMain
if (context.getThread() == null || context.getThread().getState() != Thread.State.TERMINATED) {
hb.setState(DeployState.ACTIVE);
- }
- else if (failedNodes.contains(hb.nodeId)) {
+ } else if (failedNodes.contains(hb.nodeId)) {
hb.setState(DeployState.FAILED);
- }
- else {
+ } else {
logger.debug("Reporting SHUTDOWN state because thread is {} and failedNodes is {}", context.getThread(), failedNodes);
hb.setState(DeployState.SHUTDOWN);
}
@@ -685,8 +697,7 @@ public class StreamingContainer extends YarnContainerMain
synchronized (this.heartbeatTrigger) {
try {
this.heartbeatTrigger.wait(500);
- }
- catch (InterruptedException ie) {
+ } catch (InterruptedException ie) {
logger.warn("Interrupted in heartbeat loop", ie);
break;
}
@@ -709,7 +720,7 @@ public class StreamingContainer extends YarnContainerMain
continue;
}
if (req instanceof StramToNodeChangeLoggersRequest) {
- handleChangeLoggersRequest((StramToNodeChangeLoggersRequest) req);
+ handleChangeLoggersRequest((StramToNodeChangeLoggersRequest)req);
continue;
}
@@ -725,14 +736,12 @@ public class StreamingContainer extends YarnContainerMain
logger.warn("Received request with invalid operator id {} ({})", req.getOperatorId(), req);
req.setDeleted(true);
}
- }
- else {
+ } else {
logger.debug("request received: {}", req);
OperatorRequest requestExecutor = requestFactory.getRequestExecutor(nodes.get(req.operatorId), req);
if (requestExecutor != null) {
node.context.request(requestExecutor);
- }
- else {
+ } else {
logger.warn("No executor identified for the request {}", req);
}
req.setDeleted(true);
@@ -762,7 +771,7 @@ public class StreamingContainer extends YarnContainerMain
@Override
public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException
{
- ((Operator.CheckpointListener) operator).committed(lastCommittedWindowId);
+ ((Operator.CheckpointListener)operator).committed(lastCommittedWindowId);
return null;
}
@@ -790,13 +799,11 @@ public class StreamingContainer extends YarnContainerMain
logger.info("Deploy request: {}", rsp.deployRequest);
try {
deploy(rsp.deployRequest);
- }
- catch (Exception e) {
+ } catch (Exception e) {
logger.error("deploy request failed", e);
try {
umbilical.log(this.containerId, "deploy request failed: " + rsp.deployRequest + " " + ExceptionUtils.getStackTrace(e));
- }
- catch (IOException ioe) {
+ } catch (IOException ioe) {
// ignore
}
this.exitHeartbeatLoop = true;
@@ -874,11 +881,10 @@ public class StreamingContainer extends YarnContainerMain
Context parentContext;
if (ndi instanceof UnifierDeployInfo) {
- OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo) ndi).operatorAttributes, containerContext);
+ OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo)ndi).operatorAttributes, containerContext);
parentContext = new PortContext(ndi.inputs.get(0).contextAttributes, unifiedOperatorContext);
massageUnifierDeployInfo(ndi);
- }
- else {
+ } else {
parentContext = containerContext;
}
@@ -958,8 +964,7 @@ public class StreamingContainer extends YarnContainerMain
deployBufferServerPublisher(connIdentifier, streamCodec, checkpointWindowId, queueCapacity, nodi);
newStreams.put(sourceIdentifier, deployBufferServerPublisher.getValue());
node.connectOutputPort(nodi.portName, deployBufferServerPublisher.getValue().component);
- }
- else {
+ } else {
/*
* In this case we have 2 possibilities, either we have 1 inline or multiple streams.
* Since we cannot tell at this point, we assume that we will have multiple streams and
@@ -1001,12 +1006,11 @@ public class StreamingContainer extends YarnContainerMain
String sinkIdentifier = pair.context.getSinkId();
if (sinkIdentifier == null) {
pair.context.setSinkId(deployBufferServerPublisher.getKey());
- }
- else {
+ } else {
pair.context.setSinkId(sinkIdentifier.concat(", ").concat(deployBufferServerPublisher.getKey()));
}
- ((Stream.MultiSinkCapableStream) pair.component).setSink(deployBufferServerPublisher.getKey(), deployBufferServerPublisher.getValue().component);
+ ((Stream.MultiSinkCapableStream)pair.component).setSink(deployBufferServerPublisher.getKey(), deployBufferServerPublisher.getValue().component);
}
}
}
@@ -1069,8 +1073,7 @@ public class StreamingContainer extends YarnContainerMain
if (ndi.checkpoint.windowId < smallestCheckpointedWindowId) {
smallestCheckpointedWindowId = ndi.checkpoint.windowId;
}
- }
- else {
+ } else {
Node<?> node = nodes.get(ndi.id);
for (OperatorDeployInfo.InputDeployInfo nidi : ndi.inputs) {
@@ -1120,7 +1123,7 @@ public class StreamingContainer extends YarnContainerMain
BufferServerSubscriber subscriber = fastPublisherSubscriber
? new FastSubscriber("tcp://".concat(nidi.bufferServerHost).concat(":").concat(String.valueOf(nidi.bufferServerPort)).concat("/").concat(connIdentifier), queueCapacity)
: new BufferServerSubscriber("tcp://".concat(nidi.bufferServerHost).concat(":").concat(String.valueOf(nidi.bufferServerPort)).concat("/").concat(connIdentifier), queueCapacity);
- if(streamCodec instanceof StreamCodecWrapperForPersistance) {
+ if (streamCodec instanceof StreamCodecWrapperForPersistance) {
subscriber.acquireReservoirForPersistStream(sinkIdentifier, queueCapacity, streamCodec);
}
SweepableReservoir reservoir = subscriber.acquireReservoir(sinkIdentifier, queueCapacity);
@@ -1131,8 +1134,7 @@ public class StreamingContainer extends YarnContainerMain
newStreams.put(sinkIdentifier, new ComponentContextPair<Stream, StreamContext>(subscriber, context));
logger.debug("put input stream {} against key {}", subscriber, sinkIdentifier);
- }
- else {
+ } else {
assert (nidi.locality == Locality.CONTAINER_LOCAL || nidi.locality == Locality.THREAD_LOCAL);
/* we are still dealing with the MuxStream originating at the output of the source port */
StreamContext inlineContext = new StreamContext(nidi.declaredStreamId);
@@ -1149,7 +1151,7 @@ public class StreamingContainer extends YarnContainerMain
stream = new InlineStream(queueCapacity);
if (checkpoint.windowId >= 0) {
- node.connectInputPort(nidi.portName, new WindowIdActivatedReservoir(sinkIdentifier, (SweepableReservoir) stream, checkpoint.windowId));
+ node.connectInputPort(nidi.portName, new WindowIdActivatedReservoir(sinkIdentifier, (SweepableReservoir)stream, checkpoint.windowId));
}
break;
@@ -1162,7 +1164,7 @@ public class StreamingContainer extends YarnContainerMain
throw new IllegalStateException("Locality can be either ContainerLocal or ThreadLocal");
}
- node.connectInputPort(nidi.portName, (SweepableReservoir) stream);
+ node.connectInputPort(nidi.portName, (SweepableReservoir)stream);
newStreams.put(sinkIdentifier, new ComponentContextPair<Stream, StreamContext>(stream, inlineContext));
if (!(pair.component instanceof Stream.MultiSinkCapableStream)) {
@@ -1187,27 +1189,26 @@ public class StreamingContainer extends YarnContainerMain
if (streamCodec instanceof StreamCodecWrapperForPersistance) {
PartitionAwareSinkForPersistence pas;
if (nidi.partitionKeys == null) {
- pas = new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance<Object>) streamCodec, nidi.partitionMask, stream);
+ pas = new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance<Object>)streamCodec, nidi.partitionMask, stream);
} else {
- pas = new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance<Object>) streamCodec, nidi.partitionKeys, nidi.partitionMask, stream);
+ pas = new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance<Object>)streamCodec, nidi.partitionKeys, nidi.partitionMask, stream);
}
- ((Stream.MultiSinkCapableStream) pair.component).setSink(sinkIdentifier, pas);
+ ((Stream.MultiSinkCapableStream)pair.component).setSink(sinkIdentifier, pas);
} else if (nidi.partitionKeys == null || nidi.partitionKeys.isEmpty()) {
- ((Stream.MultiSinkCapableStream) pair.component).setSink(sinkIdentifier, stream);
+ ((Stream.MultiSinkCapableStream)pair.component).setSink(sinkIdentifier, stream);
} else {
/*
* generally speaking we do not have partitions on the inline streams so the control should not
* come here but if it comes, then we are ready to handle it using the partition aware streams.
*/
- PartitionAwareSink<Object> pas = new PartitionAwareSink<Object>(streamCodec == null ? nonSerializingStreamCodec : (StreamCodec<Object>) streamCodec, nidi.partitionKeys, nidi.partitionMask, stream);
- ((Stream.MultiSinkCapableStream) pair.component).setSink(sinkIdentifier, pas);
+ PartitionAwareSink<Object> pas = new PartitionAwareSink<Object>(streamCodec == null ? nonSerializingStreamCodec : (StreamCodec<Object>)streamCodec, nidi.partitionKeys, nidi.partitionMask, stream);
+ ((Stream.MultiSinkCapableStream)pair.component).setSink(sinkIdentifier, pas);
}
String streamSinkId = pair.context.getSinkId();
if (streamSinkId == null) {
pair.context.setSinkId(sinkIdentifier);
- }
- else {
+ } else {
pair.context.setSinkId(streamSinkId.concat(", ").concat(sinkIdentifier));
}
}
@@ -1275,7 +1276,7 @@ public class StreamingContainer extends YarnContainerMain
windowGenerator.setWindowWidth(windowWidthMillis);
long windowCount = WindowGenerator.getWindowCount(millisAtFirstWindow, firstWindowMillis, windowWidthMillis);
- windowGenerator.setCheckpointCount(checkpointWindowCount, (int) (windowCount % checkpointWindowCount));
+ windowGenerator.setCheckpointCount(checkpointWindowCount, (int)(windowCount % checkpointWindowCount));
return windowGenerator;
}
@@ -1320,8 +1321,7 @@ public class StreamingContainer extends YarnContainerMain
final Node<?> node = nodes.get(ndi.id);
if (node == null) {
logger.warn("node {}/{} took longer to exit, resulting in unclean undeploy!", ndi.id, ndi.name);
- }
- else {
+ } else {
eventBus.publish(new NodeDeactivationEvent(node));
node.deactivate();
node.teardown();
@@ -1378,8 +1378,7 @@ public class StreamingContainer extends YarnContainerMain
}
node.run(); /* this is a blocking call */
- }
- catch (Error error) {
+ } catch (Error error) {
int[] operators;
if (currentdi == null) {
logger.error("Voluntary container termination due to an error in operator set {}.", setOperators, error);
@@ -1388,39 +1387,33 @@ public class StreamingContainer extends YarnContainerMain
for (Iterator<OperatorDeployInfo> it = setOperators.iterator(); it.hasNext(); i++) {
operators[i] = it.next().id;
}
- }
- else {
+ } else {
logger.error("Voluntary container termination due to an error in operator {}.", currentdi, error);
operators = new int[]{currentdi.id};
}
umbilical.reportError(containerId, operators, "Voluntary container termination due to an error. " + ExceptionUtils.getStackTrace(error));
System.exit(1);
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
if (currentdi == null) {
failedNodes.add(ndi.id);
logger.error("Operator set {} stopped running due to an exception.", setOperators, ex);
int[] operators = new int[]{ndi.id};
umbilical.reportError(containerId, operators, "Stopped running due to an exception. " + ExceptionUtils.getStackTrace(ex));
- }
- else {
+ } else {
failedNodes.add(currentdi.id);
logger.error("Abandoning deployment of operator {} due to setup failure.", currentdi, ex);
int[] operators = new int[]{currentdi.id};
umbilical.reportError(containerId, operators, "Abandoning deployment due to setup failure. " + ExceptionUtils.getStackTrace(ex));
}
- }
- finally {
+ } finally {
if (setOperators.contains(ndi)) {
try {
teardownNode(ndi);
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
failedNodes.add(ndi.id);
logger.error("Shutdown of operator {} failed due to an exception.", ndi, ex);
}
- }
- else {
+ } else {
signal.countDown();
}
@@ -1431,13 +1424,11 @@ public class StreamingContainer extends YarnContainerMain
if (setOperators.contains(oiodi)) {
try {
teardownNode(oiodi);
- }
- catch (Exception ex) {
+ } catch (Exception ex) {
failedNodes.add(oiodi.id);
logger.error("Shutdown of operator {} failed due to an exception.", oiodi, ex);
}
- }
- else {
+ } else {
signal.countDown();
}
}
@@ -1453,8 +1444,7 @@ public class StreamingContainer extends YarnContainerMain
*/
try {
signal.await();
- }
- catch (InterruptedException ex) {
+ } catch (InterruptedException ex) {
logger.debug("Activation of operators interruped.", ex);
}
@@ -1509,17 +1499,16 @@ public class StreamingContainer extends YarnContainerMain
if (temp == null) {
temp = containerContext.getValue(OperatorContext.APPLICATION_WINDOW_COUNT);
}
- int appWindowCount = (int) (windowCount % temp);
+ int appWindowCount = (int)(windowCount % temp);
temp = ndi.contextAttributes.get(OperatorContext.CHECKPOINT_WINDOW_COUNT);
if (temp == null) {
temp = containerContext.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT);
}
- int lCheckpointWindowCount = (int) (windowCount % temp);
+ int lCheckpointWindowCount = (int)(windowCount % temp);
checkpoint = new Checkpoint(WindowGenerator.getWindowId(now, firstWindowMillis, windowWidthMillis), appWindowCount, lCheckpointWindowCount);
logger.debug("using {} on {} at {}", ProcessingMode.AT_MOST_ONCE, ndi.name, checkpoint);
- }
- else {
+ } else {
checkpoint = ndi.checkpoint;
logger.debug("using {} on {} at {}", ndi.contextAttributes == null ? ProcessingMode.AT_LEAST_ONCE :
(ndi.contextAttributes.get(OperatorContext.PROCESSING_MODE) == null ? ProcessingMode.AT_LEAST_ONCE :
@@ -1535,8 +1524,7 @@ public class StreamingContainer extends YarnContainerMain
for (Component<ContainerContext> c : components) {
c.setup(ctx);
}
- }
- else {
+ } else {
for (Component<ContainerContext> c : components) {
c.teardown();
}
[2/2] incubator-apex-core git commit: Merge branch
'StyleFixesStreamingContainer' of
https://github.com/ilganeli/incubator-apex-core into devel-3
Posted by th...@apache.org.
Merge branch 'StyleFixesStreamingContainer' of https://github.com/ilganeli/incubator-apex-core into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b1af7240
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b1af7240
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b1af7240
Branch: refs/heads/devel-3
Commit: b1af72403ea37d087d3b9504b328d2da876d6e0d
Parents: b4b7c07 b0aab1a
Author: Thomas Weise <th...@datatorrent.com>
Authored: Mon Nov 2 13:53:16 2015 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Mon Nov 2 13:53:16 2015 -0800
----------------------------------------------------------------------
.../stram/engine/StreamingContainer.java | 248 +++++++++----------
1 file changed, 118 insertions(+), 130 deletions(-)
----------------------------------------------------------------------