You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by is...@apache.org on 2016/01/20 23:30:40 UTC

[1/2] incubator-apex-core git commit: Fix rawtype warnings.

Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 c284c39cd -> d0908e4bc


Fix rawtype warnings.


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d0b4bdb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d0b4bdb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d0b4bdb2

Branch: refs/heads/devel-3
Commit: d0b4bdb24e77a08181e07f13d7c5794d39c6203c
Parents: c284c39
Author: Thomas Weise <th...@datatorrent.com>
Authored: Wed Jan 20 10:57:54 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Wed Jan 20 10:59:36 2016 -0800

----------------------------------------------------------------------
 .../stram/plan/physical/PhysicalPlan.java       | 24 +++++++++++---------
 1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d0b4bdb2/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 7858ea0..829a6fd 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -206,10 +206,11 @@ public class PhysicalPlan implements Serializable
       p.statsListeners = this.statsHandlers;
     }
 
+    /**
+     * Return all partitions and unifiers, except MxN unifiers
+     * @return
+     */
     private Collection<PTOperator> getAllOperators() {
-//      if (partitions.size() == 1) {
-//        return Collections.singletonList(partitions.get(0));
-//      }
       Collection<PTOperator> c = new ArrayList<PTOperator>(partitions.size() + 1);
       c.addAll(partitions);
       for (StreamMapping ug : outputStreams.values()) {
@@ -391,7 +392,7 @@ public class PhysicalPlan implements Serializable
         for (StreamMeta s : n.getOutputStreams().values()) {
           if (s.getPersistOperator() != null) {
             InputPortMeta persistInputPort = s.getPersistOperatorInputPort();
-            StreamCodecWrapperForPersistance persistCodec = (StreamCodecWrapperForPersistance) persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
+            StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>) persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
             if (persistCodec == null)
               continue;
             // Logging is enabled for the stream
@@ -403,10 +404,10 @@ public class PhysicalPlan implements Serializable
           // Check partitioning for persist operators per sink too
           for (Entry<InputPortMeta, InputPortMeta> entry : s.sinkSpecificPersistInputPortMap.entrySet()) {
             InputPortMeta persistInputPort = entry.getValue();
-            StreamCodec codec = persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
+            StreamCodec<?> codec = persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
             if (codec != null) {
               if (codec instanceof StreamCodecWrapperForPersistance) {
-                StreamCodecWrapperForPersistance persistCodec = (StreamCodecWrapperForPersistance) codec;
+                StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>) codec;
                 updatePersistOperatorWithSinkPartitions(persistInputPort, s.sinkSpecificPersistOperatorMap.get(entry.getKey()), persistCodec, entry.getKey());
               }
             }
@@ -418,7 +419,7 @@ public class PhysicalPlan implements Serializable
     }
   }
 
-  private void updatePersistOperatorWithSinkPartitions(InputPortMeta persistInputPort, OperatorMeta persistOperatorMeta, StreamCodecWrapperForPersistance persistCodec, InputPortMeta sinkPortMeta)
+  private void updatePersistOperatorWithSinkPartitions(InputPortMeta persistInputPort, OperatorMeta persistOperatorMeta, StreamCodecWrapperForPersistance<?> persistCodec, InputPortMeta sinkPortMeta)
   {
     Collection<PTOperator> ptOperators = getOperators(sinkPortMeta.getOperatorWrapper());
     Collection<PartitionKeys> partitionKeysList = new ArrayList<PartitionKeys>();
@@ -437,7 +438,7 @@ public class PhysicalPlan implements Serializable
       for (OperatorMeta n : dag.getAllOperators()) {
         for (StreamMeta s : n.getOutputStreams().values()) {
           if (s.getPersistOperator() != null) {
-            Map<InputPortMeta, StreamCodec<Object>> inputStreamCodecs = new HashMap<InputPortMeta, StreamCodec<Object>>();
+            Map<InputPortMeta, StreamCodec<?>> inputStreamCodecs = new HashMap<>();
             // Logging is enabled for the stream
             for (InputPortMeta portMeta : s.getSinksToPersist()) {
               InputPort<?> port = portMeta.getPortObject();
@@ -452,7 +453,7 @@ public class PhysicalPlan implements Serializable
                   }
                 }
                 if (!alreadyAdded) {
-                  inputStreamCodecs.put(portMeta, (StreamCodec<Object>) inputStreamCodec);
+                  inputStreamCodecs.put(portMeta, inputStreamCodec);
                 }
               }
             }
@@ -466,8 +467,9 @@ public class PhysicalPlan implements Serializable
               // Create Wrapper codec for Stream persistence using all unique
               // stream codecs
               // Logger should write merged or union of all input stream codecs
-              StreamCodec<Object> specifiedCodecForLogger = (s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ? (StreamCodec<Object>) s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) : (StreamCodec<Object>) s.getPersistOperatorInputPort().getPortObject().getStreamCodec();
-              StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(inputStreamCodecs, specifiedCodecForLogger);
+              StreamCodec<?> specifiedCodecForLogger = (s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ? s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) : s.getPersistOperatorInputPort().getPortObject().getStreamCodec();
+              @SuppressWarnings({ "unchecked", "rawtypes" })
+              StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance(inputStreamCodecs, specifiedCodecForLogger);
               streamMetaToCodecMap.put(s, codec);
             }
           }


[2/2] incubator-apex-core git commit: Merge branch 'fix-warnings' of https://github.com/tweise/incubator-apex-core into devel-3

Posted by is...@apache.org.
Merge branch 'fix-warnings' of https://github.com/tweise/incubator-apex-core into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d0908e4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d0908e4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d0908e4b

Branch: refs/heads/devel-3
Commit: d0908e4bc226216805975ade997469dcd65b0b42
Parents: c284c39 d0b4bdb
Author: ishark <is...@datatorrent.com>
Authored: Wed Jan 20 13:57:56 2016 -0800
Committer: ishark <is...@datatorrent.com>
Committed: Wed Jan 20 13:57:56 2016 -0800

----------------------------------------------------------------------
 .../stram/plan/physical/PhysicalPlan.java       | 24 +++++++++++---------
 1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------