You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2015/11/09 17:34:03 UTC

[1/9] incubator-apex-core git commit: Fixed style violations in StreamingContainer.java to address APEX-208

Repository: incubator-apex-core
Updated Branches:
  refs/heads/feature-module 013ccbeee -> b85de3d7b


Fixed style violations in StreamingContainer.java to address APEX-208


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b0aab1ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b0aab1ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b0aab1ad

Branch: refs/heads/feature-module
Commit: b0aab1adf7f00f170c954ea5cd7d731a9fe8dfa3
Parents: af7179c
Author: Ilya Ganelin <il...@capitalone.com>
Authored: Sun Nov 1 09:32:52 2015 -0800
Committer: Ilya Ganelin <il...@capitalone.com>
Committed: Sun Nov 1 09:32:52 2015 -0800

----------------------------------------------------------------------
 .../stram/engine/StreamingContainer.java        | 248 +++++++++----------
 1 file changed, 118 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/b0aab1ad/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index a17dfdf..1544c16 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -18,7 +18,6 @@
  */
 package com.datatorrent.stram.engine;
 
-import java.io.File;
 import java.io.IOException;
 import java.lang.Thread.State;
 import java.lang.management.GarbageCollectorMXBean;
@@ -28,19 +27,24 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.AbstractMap.SimpleEntry;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 
-import net.engio.mbassy.bus.MBassador;
-import net.engio.mbassy.bus.config.BusConfiguration;
-
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -52,14 +56,21 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.log4j.DTLoggerFactory;
 import org.apache.log4j.LogManager;
 
-import com.datatorrent.api.*;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Operator.OutputPort;
 import com.datatorrent.api.Operator.ProcessingMode;
+import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.StatsListener.OperatorRequest;
+import com.datatorrent.api.StorageAgent;
+import com.datatorrent.api.StreamCodec;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.StringCodec;
 import com.datatorrent.api.annotation.Stateless;
-
 import com.datatorrent.bufferserver.server.Server;
 import com.datatorrent.bufferserver.storage.DiskStorage;
 import com.datatorrent.bufferserver.util.Codec;
@@ -70,19 +81,45 @@ import com.datatorrent.stram.ComponentContextPair;
 import com.datatorrent.stram.RecoverableRpcProxy;
 import com.datatorrent.stram.StramUtils.YarnContainerMain;
 import com.datatorrent.stram.StringCodecs;
-import com.datatorrent.stram.api.*;
-import com.datatorrent.stram.api.ContainerEvent.*;
+import com.datatorrent.stram.api.Checkpoint;
+import com.datatorrent.stram.api.ContainerContext;
+import com.datatorrent.stram.api.ContainerEvent;
+import com.datatorrent.stram.api.ContainerEvent.ContainerStatsEvent;
+import com.datatorrent.stram.api.ContainerEvent.NodeActivationEvent;
+import com.datatorrent.stram.api.ContainerEvent.NodeDeactivationEvent;
+import com.datatorrent.stram.api.ContainerEvent.StreamActivationEvent;
+import com.datatorrent.stram.api.ContainerEvent.StreamDeactivationEvent;
+import com.datatorrent.stram.api.OperatorDeployInfo;
 import com.datatorrent.stram.api.OperatorDeployInfo.OperatorType;
 import com.datatorrent.stram.api.OperatorDeployInfo.UnifierDeployInfo;
-import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.*;
+import com.datatorrent.stram.api.RequestFactory;
+import com.datatorrent.stram.api.StramToNodeChangeLoggersRequest;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeat;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerHeartbeatResponse;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.ContainerStats;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat;
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StramToNodeRequest;
+import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.StreamingContainerContext;
 import com.datatorrent.stram.debug.StdOutErrLog;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.Operators.PortContextPair;
 import com.datatorrent.stram.plan.logical.Operators.PortMappingDescriptor;
 import com.datatorrent.stram.plan.logical.StreamCodecWrapperForPersistance;
 import com.datatorrent.stram.security.StramUserLogin;
-import com.datatorrent.stram.stream.*;
+import com.datatorrent.stram.stream.BufferServerPublisher;
+import com.datatorrent.stram.stream.BufferServerSubscriber;
+import com.datatorrent.stram.stream.FastPublisher;
+import com.datatorrent.stram.stream.FastSubscriber;
+import com.datatorrent.stram.stream.InlineStream;
+import com.datatorrent.stram.stream.MuxStream;
+import com.datatorrent.stram.stream.OiOStream;
+import com.datatorrent.stram.stream.PartitionAwareSink;
+import com.datatorrent.stram.stream.PartitionAwareSinkForPersistence;
+
+import net.engio.mbassy.bus.MBassador;
+import net.engio.mbassy.bus.config.BusConfiguration;
 
 /**
  * Object which controls the container process launched by {@link com.datatorrent.stram.StreamingAppMaster}.
@@ -131,8 +168,7 @@ public class StreamingContainer extends YarnContainerMain
   static {
     try {
       eventloop = DefaultEventLoop.createEventLoop("ProcessWideEventLoop");
-    }
-    catch (IOException io) {
+    } catch (IOException io) {
       throw new RuntimeException(io);
     }
   }
@@ -183,8 +219,7 @@ public class StreamingContainer extends YarnContainerMain
           if (blocksize < 1) {
             blocksize = 1;
           }
-        }
-        else {
+        } else {
           blocksize = 64;
           blockCount = bufferServerRAM / blocksize;
         }
@@ -196,10 +231,9 @@ public class StreamingContainer extends YarnContainerMain
         }
         SocketAddress bindAddr = bufferServer.run(eventloop);
         logger.debug("Buffer server started: {}", bindAddr);
-        this.bufferServerAddress = NetUtils.getConnectAddress(((InetSocketAddress) bindAddr));
+        this.bufferServerAddress = NetUtils.getConnectAddress(((InetSocketAddress)bindAddr));
       }
-    }
-    catch (IOException ex) {
+    } catch (IOException ex) {
       logger.warn("deploy request failed due to {}", ex);
       throw new IllegalStateException("Failed to deploy buffer server", ex);
     }
@@ -210,15 +244,13 @@ public class StreamingContainer extends YarnContainerMain
         singletons.put(clazz.getName(), newInstance);
 
         if (newInstance instanceof Component) {
-          components.add((Component<ContainerContext>) newInstance);
+          components.add((Component<ContainerContext>)newInstance);
         }
 
         eventBus.subscribe(newInstance);
-      }
-      catch (InstantiationException ex) {
+      } catch (InstantiationException ex) {
         logger.warn("Container Event Listener Instantiation", ex);
-      }
-      catch (IllegalAccessException ex) {
+      } catch (IllegalAccessException ex) {
         logger.warn("Container Event Listener Instantiation", ex);
       }
     }
@@ -279,24 +311,20 @@ public class StreamingContainer extends YarnContainerMain
         /* main thread enters heartbeat loop */
         stramChild.heartbeatLoop();
         exitStatus = 0;
-      }
-      finally {
+      } finally {
         stramChild.teardown();
       }
-    }
-    catch (Error error) {
+    } catch (Error error) {
       logger.error("Fatal error in container!", error);
       /* Report back any failures, for diagnostic purposes */
       String msg = ExceptionUtils.getStackTrace(error);
       umbilical.reportError(childId, null, "FATAL: " + msg);
-    }
-    catch (Exception exception) {
+    } catch (Exception exception) {
       logger.error("Fatal exception in container!", exception);
       /* Report back any failures, for diagnostic purposes */
       String msg = ExceptionUtils.getStackTrace(exception);
       umbilical.reportError(childId, null, msg);
-    }
-    finally {
+    } finally {
       rpcProxy.close();
       DefaultMetricsSystem.shutdown();
       logger.info("Exit status for container: {}", exitStatus);
@@ -317,8 +345,7 @@ public class StreamingContainer extends YarnContainerMain
       Thread t = e.getValue().context.getThread();
       if (t == null || !t.isAlive()) {
         disconnectNode(e.getKey());
-      }
-      else {
+      } else {
         activeThreads.add(t);
         activeOperators.add(e.getKey());
         e.getValue().shutdown();
@@ -334,8 +361,7 @@ public class StreamingContainer extends YarnContainerMain
         }
         disconnectNode(iterator.next());
       }
-    }
-    catch (InterruptedException ex) {
+    } catch (InterruptedException ex) {
       logger.warn("Aborting wait for operators to get deactivated!", ex);
     }
 
@@ -370,15 +396,13 @@ public class StreamingContainer extends YarnContainerMain
           String sinks = pair.context.getSinkId();
           if (sinks == null) {
             logger.error("mux sinks found connected at {} with sink id null", sourceIdentifier);
-          }
-          else {
+          } else {
             String[] split = sinks.split(MuxStream.MULTI_SINK_ID_CONCAT_SEPARATOR);
             for (int i = split.length; i-- > 0; ) {
               ComponentContextPair<Stream, StreamContext> spair = streams.remove(split[i]);
               if (spair == null) {
                 logger.error("mux is missing the stream for sink {}", split[i]);
-              }
-              else {
+              } else {
                 if (activeStreams.remove(spair.component) != null) {
                   spair.component.deactivate();
                   eventBus.publish(new StreamDeactivationEvent(spair));
@@ -388,8 +412,7 @@ public class StreamingContainer extends YarnContainerMain
               }
             }
           }
-        }
-        else {
+        } else {
           // it's either inline stream or it's bufferserver publisher.
         }
 
@@ -417,8 +440,7 @@ public class StreamingContainer extends YarnContainerMain
           if (sourcePair == pair) {
             /* for some reason we had the stream stored against both source and sink identifiers */
             streams.remove(pair.context.getSourceId());
-          }
-          else {
+          } else {
             /* the stream was one of the many streams sourced by a muxstream */
             unregisterSinkFromMux(sourcePair, sinkIdentifier);
           }
@@ -440,7 +462,7 @@ public class StreamingContainer extends YarnContainerMain
     }
 
     if (found) {
-      ((Stream.MultiSinkCapableStream) muxpair.component).setSink(sinkIdentifier, null);
+      ((Stream.MultiSinkCapableStream)muxpair.component).setSink(sinkIdentifier, null);
 
       if (sinks.length == 1) {
         muxpair.context.setSinkId(null);
@@ -450,8 +472,7 @@ public class StreamingContainer extends YarnContainerMain
           eventBus.publish(new StreamDeactivationEvent(muxpair));
         }
         muxpair.component.teardown();
-      }
-      else {
+      } else {
         StringBuilder builder = new StringBuilder(muxpair.context.getSinkId().length() - MuxStream.MULTI_SINK_ID_CONCAT_SEPARATOR.length() - sinkIdentifier.length());
 
         found = false;
@@ -459,8 +480,7 @@ public class StreamingContainer extends YarnContainerMain
           if (sinks[i] != null) {
             if (found) {
               builder.append(MuxStream.MULTI_SINK_ID_CONCAT_SEPARATOR).append(sinks[i]);
-            }
-            else {
+            } else {
               builder.append(sinks[i]);
               found = true;
             }
@@ -469,8 +489,7 @@ public class StreamingContainer extends YarnContainerMain
 
         muxpair.context.setSinkId(builder.toString());
       }
-    }
-    else {
+    } else {
       logger.error("{} was not connected to stream connected to {}", sinkIdentifier, muxpair.context.getSourceId());
     }
 
@@ -509,11 +528,9 @@ public class StreamingContainer extends YarnContainerMain
       Node<?> node = nodes.get(operatorId);
       if (node == null) {
         throw new IllegalArgumentException("Node " + operatorId + " is not hosted in this container!");
-      }
-      else if (toUndeploy.containsKey(operatorId)) {
+      } else if (toUndeploy.containsKey(operatorId)) {
         throw new IllegalArgumentException("Node " + operatorId + " is requested to be undeployed more than once");
-      }
-      else {
+      } else {
         toUndeploy.put(operatorId, node);
       }
     }
@@ -524,8 +541,7 @@ public class StreamingContainer extends YarnContainerMain
       Thread t = nodes.get(operatorId).context.getThread();
       if (t == null || !t.isAlive()) {
         disconnectNode(operatorId);
-      }
-      else {
+      } else {
         joinList.add(t);
         discoList.add(operatorId);
         nodes.get(operatorId).shutdown();
@@ -542,8 +558,7 @@ public class StreamingContainer extends YarnContainerMain
         disconnectNode(iterator.next());
       }
       logger.info("Undeploy complete.");
-    }
-    catch (InterruptedException ex) {
+    } catch (InterruptedException ex) {
       logger.warn("Aborting wait for operators to get deactivated!", ex);
     }
 
@@ -608,8 +623,7 @@ public class StreamingContainer extends YarnContainerMain
       synchronized (this.heartbeatTrigger) {
         try {
           this.heartbeatTrigger.wait(heartbeatIntervalMillis);
-        }
-        catch (InterruptedException e1) {
+        } catch (InterruptedException e1) {
           logger.warn("Interrupted in heartbeat loop, exiting..");
           break;
         }
@@ -626,7 +640,7 @@ public class StreamingContainer extends YarnContainerMain
           msg.restartRequested = true;
         }
       }
-      msg.memoryMBFree = ((int) (Runtime.getRuntime().freeMemory() / (1024 * 1024)));
+      msg.memoryMBFree = ((int)(Runtime.getRuntime().freeMemory() / (1024 * 1024)));
       garbageCollectorMXBeans = ManagementFactory.getGarbageCollectorMXBeans();
       for (GarbageCollectorMXBean bean : garbageCollectorMXBeans) {
         msg.gcCollectionTime += bean.getCollectionTime();
@@ -655,11 +669,9 @@ public class StreamingContainer extends YarnContainerMain
 
           if (context.getThread() == null || context.getThread().getState() != Thread.State.TERMINATED) {
             hb.setState(DeployState.ACTIVE);
-          }
-          else if (failedNodes.contains(hb.nodeId)) {
+          } else if (failedNodes.contains(hb.nodeId)) {
             hb.setState(DeployState.FAILED);
-          }
-          else {
+          } else {
             logger.debug("Reporting SHUTDOWN state because thread is {} and failedNodes is {}", context.getThread(), failedNodes);
             hb.setState(DeployState.SHUTDOWN);
           }
@@ -685,8 +697,7 @@ public class StreamingContainer extends YarnContainerMain
           synchronized (this.heartbeatTrigger) {
             try {
               this.heartbeatTrigger.wait(500);
-            }
-            catch (InterruptedException ie) {
+            } catch (InterruptedException ie) {
               logger.warn("Interrupted in heartbeat loop", ie);
               break;
             }
@@ -709,7 +720,7 @@ public class StreamingContainer extends YarnContainerMain
         continue;
       }
       if (req instanceof StramToNodeChangeLoggersRequest) {
-        handleChangeLoggersRequest((StramToNodeChangeLoggersRequest) req);
+        handleChangeLoggersRequest((StramToNodeChangeLoggersRequest)req);
         continue;
       }
 
@@ -725,14 +736,12 @@ public class StreamingContainer extends YarnContainerMain
           logger.warn("Received request with invalid operator id {} ({})", req.getOperatorId(), req);
           req.setDeleted(true);
         }
-      }
-      else {
+      } else {
         logger.debug("request received: {}", req);
         OperatorRequest requestExecutor = requestFactory.getRequestExecutor(nodes.get(req.operatorId), req);
         if (requestExecutor != null) {
           node.context.request(requestExecutor);
-        }
-        else {
+        } else {
           logger.warn("No executor identified for the request {}", req);
         }
         req.setDeleted(true);
@@ -762,7 +771,7 @@ public class StreamingContainer extends YarnContainerMain
               @Override
               public StatsListener.OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException
               {
-                ((Operator.CheckpointListener) operator).committed(lastCommittedWindowId);
+                ((Operator.CheckpointListener)operator).committed(lastCommittedWindowId);
                 return null;
               }
 
@@ -790,13 +799,11 @@ public class StreamingContainer extends YarnContainerMain
       logger.info("Deploy request: {}", rsp.deployRequest);
       try {
         deploy(rsp.deployRequest);
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         logger.error("deploy request failed", e);
         try {
           umbilical.log(this.containerId, "deploy request failed: " + rsp.deployRequest + " " + ExceptionUtils.getStackTrace(e));
-        }
-        catch (IOException ioe) {
+        } catch (IOException ioe) {
           // ignore
         }
         this.exitHeartbeatLoop = true;
@@ -874,11 +881,10 @@ public class StreamingContainer extends YarnContainerMain
 
       Context parentContext;
       if (ndi instanceof UnifierDeployInfo) {
-        OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo) ndi).operatorAttributes, containerContext);
+        OperatorContext unifiedOperatorContext = new OperatorContext(0, ((UnifierDeployInfo)ndi).operatorAttributes, containerContext);
         parentContext = new PortContext(ndi.inputs.get(0).contextAttributes, unifiedOperatorContext);
         massageUnifierDeployInfo(ndi);
-      }
-      else {
+      } else {
         parentContext = containerContext;
       }
 
@@ -958,8 +964,7 @@ public class StreamingContainer extends YarnContainerMain
             deployBufferServerPublisher(connIdentifier, streamCodec, checkpointWindowId, queueCapacity, nodi);
           newStreams.put(sourceIdentifier, deployBufferServerPublisher.getValue());
           node.connectOutputPort(nodi.portName, deployBufferServerPublisher.getValue().component);
-        }
-        else {
+        } else {
           /*
            * In this case we have 2 possibilities, either we have 1 inline or multiple streams.
            * Since we cannot tell at this point, we assume that we will have multiple streams and
@@ -1001,12 +1006,11 @@ public class StreamingContainer extends YarnContainerMain
               String sinkIdentifier = pair.context.getSinkId();
               if (sinkIdentifier == null) {
                 pair.context.setSinkId(deployBufferServerPublisher.getKey());
-              }
-              else {
+              } else {
                 pair.context.setSinkId(sinkIdentifier.concat(", ").concat(deployBufferServerPublisher.getKey()));
               }
 
-              ((Stream.MultiSinkCapableStream) pair.component).setSink(deployBufferServerPublisher.getKey(), deployBufferServerPublisher.getValue().component);
+              ((Stream.MultiSinkCapableStream)pair.component).setSink(deployBufferServerPublisher.getKey(), deployBufferServerPublisher.getValue().component);
             }
           }
         }
@@ -1069,8 +1073,7 @@ public class StreamingContainer extends YarnContainerMain
         if (ndi.checkpoint.windowId < smallestCheckpointedWindowId) {
           smallestCheckpointedWindowId = ndi.checkpoint.windowId;
         }
-      }
-      else {
+      } else {
         Node<?> node = nodes.get(ndi.id);
 
         for (OperatorDeployInfo.InputDeployInfo nidi : ndi.inputs) {
@@ -1120,7 +1123,7 @@ public class StreamingContainer extends YarnContainerMain
             BufferServerSubscriber subscriber = fastPublisherSubscriber
               ? new FastSubscriber("tcp://".concat(nidi.bufferServerHost).concat(":").concat(String.valueOf(nidi.bufferServerPort)).concat("/").concat(connIdentifier), queueCapacity)
               : new BufferServerSubscriber("tcp://".concat(nidi.bufferServerHost).concat(":").concat(String.valueOf(nidi.bufferServerPort)).concat("/").concat(connIdentifier), queueCapacity);
-            if(streamCodec instanceof StreamCodecWrapperForPersistance) {
+            if (streamCodec instanceof StreamCodecWrapperForPersistance) {
               subscriber.acquireReservoirForPersistStream(sinkIdentifier, queueCapacity, streamCodec);
             }
             SweepableReservoir reservoir = subscriber.acquireReservoir(sinkIdentifier, queueCapacity);
@@ -1131,8 +1134,7 @@ public class StreamingContainer extends YarnContainerMain
 
             newStreams.put(sinkIdentifier, new ComponentContextPair<Stream, StreamContext>(subscriber, context));
             logger.debug("put input stream {} against key {}", subscriber, sinkIdentifier);
-          }
-          else {
+          } else {
             assert (nidi.locality == Locality.CONTAINER_LOCAL || nidi.locality == Locality.THREAD_LOCAL);
             /* we are still dealing with the MuxStream originating at the output of the source port */
             StreamContext inlineContext = new StreamContext(nidi.declaredStreamId);
@@ -1149,7 +1151,7 @@ public class StreamingContainer extends YarnContainerMain
 
                 stream = new InlineStream(queueCapacity);
                 if (checkpoint.windowId >= 0) {
-                  node.connectInputPort(nidi.portName, new WindowIdActivatedReservoir(sinkIdentifier, (SweepableReservoir) stream, checkpoint.windowId));
+                  node.connectInputPort(nidi.portName, new WindowIdActivatedReservoir(sinkIdentifier, (SweepableReservoir)stream, checkpoint.windowId));
                 }
                 break;
 
@@ -1162,7 +1164,7 @@ public class StreamingContainer extends YarnContainerMain
                 throw new IllegalStateException("Locality can be either ContainerLocal or ThreadLocal");
             }
 
-            node.connectInputPort(nidi.portName, (SweepableReservoir) stream);
+            node.connectInputPort(nidi.portName, (SweepableReservoir)stream);
             newStreams.put(sinkIdentifier, new ComponentContextPair<Stream, StreamContext>(stream, inlineContext));
 
             if (!(pair.component instanceof Stream.MultiSinkCapableStream)) {
@@ -1187,27 +1189,26 @@ public class StreamingContainer extends YarnContainerMain
             if (streamCodec instanceof StreamCodecWrapperForPersistance) {
               PartitionAwareSinkForPersistence pas;
               if (nidi.partitionKeys == null) {
-                pas = new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance<Object>) streamCodec, nidi.partitionMask, stream);
+                pas = new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance<Object>)streamCodec, nidi.partitionMask, stream);
               } else {
-                pas = new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance<Object>) streamCodec, nidi.partitionKeys, nidi.partitionMask, stream);
+                pas = new PartitionAwareSinkForPersistence((StreamCodecWrapperForPersistance<Object>)streamCodec, nidi.partitionKeys, nidi.partitionMask, stream);
               }
-              ((Stream.MultiSinkCapableStream) pair.component).setSink(sinkIdentifier, pas);
+              ((Stream.MultiSinkCapableStream)pair.component).setSink(sinkIdentifier, pas);
             } else if (nidi.partitionKeys == null || nidi.partitionKeys.isEmpty()) {
-              ((Stream.MultiSinkCapableStream) pair.component).setSink(sinkIdentifier, stream);
+              ((Stream.MultiSinkCapableStream)pair.component).setSink(sinkIdentifier, stream);
             } else {
               /*
                * generally speaking we do not have partitions on the inline streams so the control should not
                * come here but if it comes, then we are ready to handle it using the partition aware streams.
                */
-              PartitionAwareSink<Object> pas = new PartitionAwareSink<Object>(streamCodec == null ? nonSerializingStreamCodec : (StreamCodec<Object>) streamCodec, nidi.partitionKeys, nidi.partitionMask, stream);
-              ((Stream.MultiSinkCapableStream) pair.component).setSink(sinkIdentifier, pas);
+              PartitionAwareSink<Object> pas = new PartitionAwareSink<Object>(streamCodec == null ? nonSerializingStreamCodec : (StreamCodec<Object>)streamCodec, nidi.partitionKeys, nidi.partitionMask, stream);
+              ((Stream.MultiSinkCapableStream)pair.component).setSink(sinkIdentifier, pas);
             }
 
             String streamSinkId = pair.context.getSinkId();
             if (streamSinkId == null) {
               pair.context.setSinkId(sinkIdentifier);
-            }
-            else {
+            } else {
               pair.context.setSinkId(streamSinkId.concat(", ").concat(sinkIdentifier));
             }
           }
@@ -1275,7 +1276,7 @@ public class StreamingContainer extends YarnContainerMain
     windowGenerator.setWindowWidth(windowWidthMillis);
 
     long windowCount = WindowGenerator.getWindowCount(millisAtFirstWindow, firstWindowMillis, windowWidthMillis);
-    windowGenerator.setCheckpointCount(checkpointWindowCount, (int) (windowCount % checkpointWindowCount));
+    windowGenerator.setCheckpointCount(checkpointWindowCount, (int)(windowCount % checkpointWindowCount));
     return windowGenerator;
   }
 
@@ -1320,8 +1321,7 @@ public class StreamingContainer extends YarnContainerMain
     final Node<?> node = nodes.get(ndi.id);
     if (node == null) {
       logger.warn("node {}/{} took longer to exit, resulting in unclean undeploy!", ndi.id, ndi.name);
-    }
-    else {
+    } else {
       eventBus.publish(new NodeDeactivationEvent(node));
       node.deactivate();
       node.teardown();
@@ -1378,8 +1378,7 @@ public class StreamingContainer extends YarnContainerMain
             }
 
             node.run(); /* this is a blocking call */
-          }
-          catch (Error error) {
+          } catch (Error error) {
             int[] operators;
             if (currentdi == null) {
               logger.error("Voluntary container termination due to an error in operator set {}.", setOperators, error);
@@ -1388,39 +1387,33 @@ public class StreamingContainer extends YarnContainerMain
               for (Iterator<OperatorDeployInfo> it = setOperators.iterator(); it.hasNext(); i++) {
                 operators[i] = it.next().id;
               }
-            }
-            else {
+            } else {
               logger.error("Voluntary container termination due to an error in operator {}.", currentdi, error);
               operators = new int[]{currentdi.id};
             }
             umbilical.reportError(containerId, operators, "Voluntary container termination due to an error. " + ExceptionUtils.getStackTrace(error));
             System.exit(1);
-          }
-          catch (Exception ex) {
+          } catch (Exception ex) {
             if (currentdi == null) {
               failedNodes.add(ndi.id);
               logger.error("Operator set {} stopped running due to an exception.", setOperators, ex);
               int[] operators = new int[]{ndi.id};
               umbilical.reportError(containerId, operators, "Stopped running due to an exception. " + ExceptionUtils.getStackTrace(ex));
-            }
-            else {
+            } else {
               failedNodes.add(currentdi.id);
               logger.error("Abandoning deployment of operator {} due to setup failure.", currentdi, ex);
               int[] operators = new int[]{currentdi.id};
               umbilical.reportError(containerId, operators, "Abandoning deployment due to setup failure. " + ExceptionUtils.getStackTrace(ex));
             }
-          }
-          finally {
+          } finally {
             if (setOperators.contains(ndi)) {
               try {
                 teardownNode(ndi);
-              }
-              catch (Exception ex) {
+              } catch (Exception ex) {
                 failedNodes.add(ndi.id);
                 logger.error("Shutdown of operator {} failed due to an exception.", ndi, ex);
               }
-            }
-            else {
+            } else {
               signal.countDown();
             }
 
@@ -1431,13 +1424,11 @@ public class StreamingContainer extends YarnContainerMain
                 if (setOperators.contains(oiodi)) {
                   try {
                     teardownNode(oiodi);
-                  }
-                  catch (Exception ex) {
+                  } catch (Exception ex) {
                     failedNodes.add(oiodi.id);
                     logger.error("Shutdown of operator {} failed due to an exception.", oiodi, ex);
                   }
-                }
-                else {
+                } else {
                   signal.countDown();
                 }
               }
@@ -1453,8 +1444,7 @@ public class StreamingContainer extends YarnContainerMain
      */
     try {
       signal.await();
-    }
-    catch (InterruptedException ex) {
+    } catch (InterruptedException ex) {
       logger.debug("Activation of operators interruped.", ex);
     }
 
@@ -1509,17 +1499,16 @@ public class StreamingContainer extends YarnContainerMain
       if (temp == null) {
         temp = containerContext.getValue(OperatorContext.APPLICATION_WINDOW_COUNT);
       }
-      int appWindowCount = (int) (windowCount % temp);
+      int appWindowCount = (int)(windowCount % temp);
 
       temp = ndi.contextAttributes.get(OperatorContext.CHECKPOINT_WINDOW_COUNT);
       if (temp == null) {
         temp = containerContext.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT);
       }
-      int lCheckpointWindowCount = (int) (windowCount % temp);
+      int lCheckpointWindowCount = (int)(windowCount % temp);
       checkpoint = new Checkpoint(WindowGenerator.getWindowId(now, firstWindowMillis, windowWidthMillis), appWindowCount, lCheckpointWindowCount);
       logger.debug("using {} on {} at {}", ProcessingMode.AT_MOST_ONCE, ndi.name, checkpoint);
-    }
-    else {
+    } else {
       checkpoint = ndi.checkpoint;
       logger.debug("using {} on {} at {}", ndi.contextAttributes == null ? ProcessingMode.AT_LEAST_ONCE :
         (ndi.contextAttributes.get(OperatorContext.PROCESSING_MODE) == null ? ProcessingMode.AT_LEAST_ONCE :
@@ -1535,8 +1524,7 @@ public class StreamingContainer extends YarnContainerMain
       for (Component<ContainerContext> c : components) {
         c.setup(ctx);
       }
-    }
-    else {
+    } else {
       for (Component<ContainerContext> c : components) {
         c.teardown();
       }


[6/9] incubator-apex-core git commit: APEX-250 #resolve #comment added checkstyle suppressions file to exclude DTCli from System.out and System.err checks

Posted by vr...@apache.org.
APEX-250 #resolve #comment added checkstyle suppressions file to exclude DTCli from System.out and System.err checks


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/82fd8c36
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/82fd8c36
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/82fd8c36

Branch: refs/heads/feature-module
Commit: 82fd8c36db74981e8a57f3c362a8912466755706
Parents: f35522b
Author: Chandni Singh <cs...@apache.org>
Authored: Fri Nov 6 00:28:30 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Fri Nov 6 00:36:01 2015 -0800

----------------------------------------------------------------------
 checkstyle-suppressions.xml | 28 ++++++++++++++++++++++++++++
 engine/pom.xml              |  2 +-
 pom.xml                     |  2 ++
 3 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/82fd8c36/checkstyle-suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle-suppressions.xml b/checkstyle-suppressions.xml
new file mode 100644
index 0000000..7496011
--- /dev/null
+++ b/checkstyle-suppressions.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<!DOCTYPE suppressions PUBLIC
+  "-//Puppy Crawl//DTD Suppressions 1.0//EN"
+  "http://www.puppycrawl.com/dtds/suppressions_1_0.dtd">
+
+<suppressions>
+  <suppress checks="RegexpMultiline" files="DTCli.java"/>
+</suppressions>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/82fd8c36/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index 1513728..791016d 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -145,7 +145,7 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
         <configuration>
-          <maxAllowedViolations>2072</maxAllowedViolations>
+          <maxAllowedViolations>2020</maxAllowedViolations>
         </configuration>
       </plugin>
     </plugins>

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/82fd8c36/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index ff917ff..89abc04 100644
--- a/pom.xml
+++ b/pom.xml
@@ -323,6 +323,8 @@
           </executions>
           <configuration>
             <configLocation>apex_checks.xml</configLocation>
+            <suppressionsLocation>checkstyle-suppressions.xml</suppressionsLocation>
+            <suppressionsFileExpression>checkstyle.suppressions.file</suppressionsFileExpression>
           </configuration>
         </plugin>
         <plugin>


[3/9] incubator-apex-core git commit: Merge branch 'StyleFixesStreamingContainer' of https://github.com/ilganeli/incubator-apex-core into devel-3

Posted by vr...@apache.org.
Merge branch 'StyleFixesStreamingContainer' of https://github.com/ilganeli/incubator-apex-core into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b1af7240
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b1af7240
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b1af7240

Branch: refs/heads/feature-module
Commit: b1af72403ea37d087d3b9504b328d2da876d6e0d
Parents: b4b7c07 b0aab1a
Author: Thomas Weise <th...@datatorrent.com>
Authored: Mon Nov 2 13:53:16 2015 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Mon Nov 2 13:53:16 2015 -0800

----------------------------------------------------------------------
 .../stram/engine/StreamingContainer.java        | 248 +++++++++----------
 1 file changed, 118 insertions(+), 130 deletions(-)
----------------------------------------------------------------------



[8/9] incubator-apex-core git commit: Merge branch 'APEX-250' of github.com:chandnisingh/incubator-apex-core into devel-3

Posted by vr...@apache.org.
Merge branch 'APEX-250' of github.com:chandnisingh/incubator-apex-core into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b3863b24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b3863b24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b3863b24

Branch: refs/heads/feature-module
Commit: b3863b24b6c58ed8cb8bd63c6ba660f4743315c8
Parents: 29a4e22 82fd8c3
Author: David Yan <da...@datatorrent.com>
Authored: Fri Nov 6 14:37:28 2015 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Nov 6 14:37:28 2015 -0800

----------------------------------------------------------------------
 checkstyle-suppressions.xml | 28 ++++++++++++++++++++++++++++
 engine/pom.xml              |  2 +-
 pom.xml                     |  2 ++
 3 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[2/9] incubator-apex-core git commit: Skipping endWindow and operator is shutdown prematurely. APEX-58 #resolve

Posted by vr...@apache.org.
Skipping endWindow and operator is shutdown prematurely. APEX-58 #resolve


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/18d43731
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/18d43731
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/18d43731

Branch: refs/heads/feature-module
Commit: 18d437315689aeea32c391a9670f7fc17554fed2
Parents: 8d9aa4b
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Tue Oct 27 18:22:32 2015 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Mon Nov 2 10:41:47 2015 -0800

----------------------------------------------------------------------
 .../datatorrent/stram/engine/GenericNode.java   |  6 +-
 .../java/com/datatorrent/stram/engine/Node.java | 33 ++++++--
 .../stram/engine/GenericNodeTest.java           | 85 ++++++++++++++++++++
 3 files changed, 118 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/18d43731/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
index 3902f37..26ba98a 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/GenericNode.java
@@ -553,7 +553,11 @@ public class GenericNode extends Node<Operator>
       }
     }
 
-    if (insideWindow) {
+    /**
+     * TODO: If shutdown and inside window provide alternate way of notifying the operator in such ways
+     * TODO: as using a listener callback
+     */
+    if (insideWindow && !shutdown) {
       endWindowEmitTime = System.currentTimeMillis();
       operator.endWindow();
       if (++applicationWindowCount == APPLICATION_WINDOW_COUNT) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/18d43731/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index b073dcd..c66df12 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -28,25 +28,44 @@ import java.lang.reflect.Array;
 import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.*;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.hadoop.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.util.ReflectionUtils;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.math.IntMath;
 
-import com.datatorrent.api.*;
+import com.datatorrent.api.AutoMetric;
+import com.datatorrent.api.Component;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.InputPort;
 import com.datatorrent.api.Operator.OutputPort;
 import com.datatorrent.api.Operator.ProcessingMode;
 import com.datatorrent.api.Operator.Unifier;
+import com.datatorrent.api.Sink;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.StatsListener.OperatorRequest;
-
+import com.datatorrent.api.StorageAgent;
 import com.datatorrent.bufferserver.util.Codec;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.Pair;
@@ -297,6 +316,10 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera
       logger.warn("Shutdown requested when context is not available!");
     }
     else {
+      /*
+       * Since alive is non-volatile this code explicitly unsets it in the operator lifecycle theread thereby notifying
+       * it even when the thread is reading it from the cache
+       */
       context.request(new OperatorRequest()
       {
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/18d43731/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index 9e62ac5..d5ceae6 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -211,4 +211,89 @@ public class GenericNodeTest
     Assert.assertEquals(Thread.State.TERMINATED, t.getState());
   }
 
+  @Test
+  public void testPrematureTermination() throws InterruptedException
+  {
+    long maxSleep = 5000;
+    long sleeptime = 25L;
+    GenericOperator go = new GenericOperator();
+    final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, new DefaultAttributeMap(), null));
+    gn.setId(1);
+    DefaultReservoir reservoir1 = new DefaultReservoir("ip1Res", 1024);
+    DefaultReservoir reservoir2 = new DefaultReservoir("ip2Res", 1024);
+
+    gn.connectInputPort("ip1", reservoir1);
+    gn.connectInputPort("ip2", reservoir2);
+    gn.connectOutputPort("op", Sink.BLACKHOLE);
+
+    final AtomicBoolean ab = new AtomicBoolean(false);
+    Thread t = new Thread()
+    {
+      @Override
+      public void run()
+      {
+        ab.set(true);
+        gn.activate();
+        gn.run();
+        gn.deactivate();
+      }
+
+    };
+    t.start();
+
+    long interval = 0;
+    do {
+      Thread.sleep(sleeptime);
+      interval += sleeptime;
+    }
+    while ((ab.get() == false) && (interval < maxSleep));
+
+
+    int controlTupleCount = gn.controlTupleCount;
+    Tuple beginWindow1 = new Tuple(MessageType.BEGIN_WINDOW, 0x1L);
+
+    reservoir1.add(beginWindow1);
+    reservoir2.add(beginWindow1);
+
+    interval = 0;
+    do {
+      Thread.sleep(sleeptime);
+      interval += sleeptime;
+    }
+    while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
+    Assert.assertTrue("Begin window called", go.endWindowId != go.beginWindowId);
+    controlTupleCount = gn.controlTupleCount;
+
+    Tuple endWindow1 = new EndWindowTuple(0x1L);
+
+    reservoir1.add(endWindow1);
+    reservoir2.add(endWindow1);
+
+    interval = 0;
+    do {
+      Thread.sleep(sleeptime);
+      interval += sleeptime;
+    }
+    while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
+    Assert.assertTrue("End window called", go.endWindowId == go.beginWindowId);
+    controlTupleCount = gn.controlTupleCount;
+
+    Tuple beginWindow2 = new Tuple(MessageType.BEGIN_WINDOW, 0x2L);
+
+    reservoir1.add(beginWindow2);
+    reservoir2.add(beginWindow2);
+
+    interval = 0;
+    do {
+      Thread.sleep(sleeptime);
+      interval += sleeptime;
+    }
+    while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
+
+    gn.shutdown();
+    t.join();
+
+    Assert.assertTrue("End window not called", go.endWindowId != go.beginWindowId);
+  }
+
 }


[7/9] incubator-apex-core git commit: misc typo fix

Posted by vr...@apache.org.
misc typo fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/29a4e223
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/29a4e223
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/29a4e223

Branch: refs/heads/feature-module
Commit: 29a4e223e1944576836a4419cbd79e7e24c64cab
Parents: f35522b
Author: David Yan <da...@datatorrent.com>
Authored: Wed Nov 4 17:04:43 2015 -0800
Committer: David Yan <da...@datatorrent.com>
Committed: Fri Nov 6 10:58:55 2015 -0800

----------------------------------------------------------------------
 api/src/main/java/com/datatorrent/api/AutoMetric.java              | 2 +-
 api/src/main/java/com/datatorrent/api/Operator.java                | 2 +-
 .../main/java/com/datatorrent/stram/engine/StreamingContainer.java | 2 +-
 .../java/com/datatorrent/stram/webapp/LogicalOperatorInfo.java     | 1 -
 4 files changed, 3 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29a4e223/api/src/main/java/com/datatorrent/api/AutoMetric.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/AutoMetric.java b/api/src/main/java/com/datatorrent/api/AutoMetric.java
index ddf99c6..1c1fb25 100644
--- a/api/src/main/java/com/datatorrent/api/AutoMetric.java
+++ b/api/src/main/java/com/datatorrent/api/AutoMetric.java
@@ -87,7 +87,7 @@ public @interface AutoMetric
      * <li>w - week</li>
      * <li>M - month</li>
      * <li>q - quarter</li>
-     * <li>y - year</li
+     * <li>y - year</li>
      * </ul>
      *
      * @return time buckets.

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29a4e223/api/src/main/java/com/datatorrent/api/Operator.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/com/datatorrent/api/Operator.java b/api/src/main/java/com/datatorrent/api/Operator.java
index c362159..eb69266 100644
--- a/api/src/main/java/com/datatorrent/api/Operator.java
+++ b/api/src/main/java/com/datatorrent/api/Operator.java
@@ -36,7 +36,7 @@ public interface Operator extends Component<OperatorContext>
    * processed data. This is the default mode.
    * <br />
    * In AT_MOST_ONCE mode in case of failure, the operator will start with the tuples which are being sent at the time
-   * the failed operator is recovered. Unlike AT_LEAST_MOST once operator, it will not try to recover the tuples which
+   * the failed operator is recovered. Unlike AT_LEAST_ONCE operator, it will not try to recover the tuples which
    * may have arrived while operator was down. Typically you would want to mark operators AT_MOST_ONCE if it does not
    * materially impact your computation if a few tuples are omitted from the computation and the expected throughput is
    * most likely to consume all the resources available for the operator or the DAG.

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29a4e223/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
index 1544c16..14e00a9 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java
@@ -1445,7 +1445,7 @@ public class StreamingContainer extends YarnContainerMain
     try {
       signal.await();
     } catch (InterruptedException ex) {
-      logger.debug("Activation of operators interruped.", ex);
+      logger.debug("Activation of operators interrupted.", ex);
     }
 
     for (ComponentContextPair<Stream, StreamContext> pair : newStreams.values()) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/29a4e223/engine/src/main/java/com/datatorrent/stram/webapp/LogicalOperatorInfo.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/webapp/LogicalOperatorInfo.java b/engine/src/main/java/com/datatorrent/stram/webapp/LogicalOperatorInfo.java
index e001f5a..9d8d648 100644
--- a/engine/src/main/java/com/datatorrent/stram/webapp/LogicalOperatorInfo.java
+++ b/engine/src/main/java/com/datatorrent/stram/webapp/LogicalOperatorInfo.java
@@ -18,7 +18,6 @@
  */
 package com.datatorrent.stram.webapp;
 
-import com.datatorrent.stram.appdata.AppDataPushAgent;
 import java.util.*;
 import javax.xml.bind.annotation.*;
 import org.apache.commons.lang3.mutable.MutableInt;


[5/9] incubator-apex-core git commit: Adjust checkstyle maxAllowedViolations.

Posted by vr...@apache.org.
Adjust checkstyle maxAllowedViolations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/f35522b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/f35522b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/f35522b5

Branch: refs/heads/feature-module
Commit: f35522b5ac2426967f916c627fe57ae823f03a3b
Parents: b6d80ca
Author: Thomas Weise <th...@datatorrent.com>
Authored: Mon Nov 2 17:45:51 2015 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Mon Nov 2 17:45:51 2015 -0800

----------------------------------------------------------------------
 engine/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/f35522b5/engine/pom.xml
----------------------------------------------------------------------
diff --git a/engine/pom.xml b/engine/pom.xml
index a915ff2..1513728 100644
--- a/engine/pom.xml
+++ b/engine/pom.xml
@@ -145,7 +145,7 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-checkstyle-plugin</artifactId>
         <configuration>
-          <maxAllowedViolations>2159</maxAllowedViolations>
+          <maxAllowedViolations>2072</maxAllowedViolations>
         </configuration>
       </plugin>
     </plugins>


[4/9] incubator-apex-core git commit: Merge branch 'APEX-58' of https://github.com/PramodSSImmaneni/incubator-apex-core into devel-3

Posted by vr...@apache.org.
Merge branch 'APEX-58' of https://github.com/PramodSSImmaneni/incubator-apex-core into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b6d80ca5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b6d80ca5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b6d80ca5

Branch: refs/heads/feature-module
Commit: b6d80ca52a94ae1ae10f06f79c55a126e5915fb5
Parents: b1af724 18d4373
Author: Thomas Weise <th...@datatorrent.com>
Authored: Mon Nov 2 17:32:53 2015 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Mon Nov 2 17:32:53 2015 -0800

----------------------------------------------------------------------
 .../datatorrent/stram/engine/GenericNode.java   |  6 +-
 .../java/com/datatorrent/stram/engine/Node.java | 33 ++++++--
 .../stram/engine/GenericNodeTest.java           | 85 ++++++++++++++++++++
 3 files changed, 118 insertions(+), 6 deletions(-)
----------------------------------------------------------------------



[9/9] incubator-apex-core git commit: Merge branch 'devel-3' of https://github.com/apache/incubator-apex-core into feature-module

Posted by vr...@apache.org.
Merge branch 'devel-3' of https://github.com/apache/incubator-apex-core into feature-module


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/b85de3d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/b85de3d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/b85de3d7

Branch: refs/heads/feature-module
Commit: b85de3d7b73b1219a1243b152cfdfc8ae62a188d
Parents: 013ccbee b3863b2
Author: Vlad Rozov <v....@datatorrent.com>
Authored: Mon Nov 9 08:32:11 2015 -0800
Committer: Vlad Rozov <v....@datatorrent.com>
Committed: Mon Nov 9 08:32:11 2015 -0800

----------------------------------------------------------------------
 .../java/com/datatorrent/api/AutoMetric.java    |   2 +-
 .../main/java/com/datatorrent/api/Operator.java |   2 +-
 checkstyle-suppressions.xml                     |  28 +++
 engine/pom.xml                                  |   2 +-
 .../datatorrent/stram/engine/GenericNode.java   |   6 +-
 .../java/com/datatorrent/stram/engine/Node.java |  33 ++-
 .../stram/engine/StreamingContainer.java        | 250 +++++++++----------
 .../stram/webapp/LogicalOperatorInfo.java       |   1 -
 .../stram/engine/GenericNodeTest.java           |  85 +++++++
 pom.xml                                         |   2 +
 10 files changed, 270 insertions(+), 141 deletions(-)
----------------------------------------------------------------------