You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 08:22:00 UTC
[41/50] incubator-apex-core git commit: Fix rawtype warnings.
Fix rawtype warnings.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/d0b4bdb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/d0b4bdb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/d0b4bdb2
Branch: refs/heads/master
Commit: d0b4bdb24e77a08181e07f13d7c5794d39c6203c
Parents: c284c39
Author: Thomas Weise <th...@datatorrent.com>
Authored: Wed Jan 20 10:57:54 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Wed Jan 20 10:59:36 2016 -0800
----------------------------------------------------------------------
.../stram/plan/physical/PhysicalPlan.java | 24 +++++++++++---------
1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/d0b4bdb2/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index 7858ea0..829a6fd 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -206,10 +206,11 @@ public class PhysicalPlan implements Serializable
p.statsListeners = this.statsHandlers;
}
+ /**
+ * Return all partitions and unifiers, except MxN unifiers
+ * @return
+ */
private Collection<PTOperator> getAllOperators() {
-// if (partitions.size() == 1) {
-// return Collections.singletonList(partitions.get(0));
-// }
Collection<PTOperator> c = new ArrayList<PTOperator>(partitions.size() + 1);
c.addAll(partitions);
for (StreamMapping ug : outputStreams.values()) {
@@ -391,7 +392,7 @@ public class PhysicalPlan implements Serializable
for (StreamMeta s : n.getOutputStreams().values()) {
if (s.getPersistOperator() != null) {
InputPortMeta persistInputPort = s.getPersistOperatorInputPort();
- StreamCodecWrapperForPersistance persistCodec = (StreamCodecWrapperForPersistance) persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
+ StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>) persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
if (persistCodec == null)
continue;
// Logging is enabled for the stream
@@ -403,10 +404,10 @@ public class PhysicalPlan implements Serializable
// Check partitioning for persist operators per sink too
for (Entry<InputPortMeta, InputPortMeta> entry : s.sinkSpecificPersistInputPortMap.entrySet()) {
InputPortMeta persistInputPort = entry.getValue();
- StreamCodec codec = persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
+ StreamCodec<?> codec = persistInputPort.getAttributes().get(PortContext.STREAM_CODEC);
if (codec != null) {
if (codec instanceof StreamCodecWrapperForPersistance) {
- StreamCodecWrapperForPersistance persistCodec = (StreamCodecWrapperForPersistance) codec;
+ StreamCodecWrapperForPersistance<?> persistCodec = (StreamCodecWrapperForPersistance<?>) codec;
updatePersistOperatorWithSinkPartitions(persistInputPort, s.sinkSpecificPersistOperatorMap.get(entry.getKey()), persistCodec, entry.getKey());
}
}
@@ -418,7 +419,7 @@ public class PhysicalPlan implements Serializable
}
}
- private void updatePersistOperatorWithSinkPartitions(InputPortMeta persistInputPort, OperatorMeta persistOperatorMeta, StreamCodecWrapperForPersistance persistCodec, InputPortMeta sinkPortMeta)
+ private void updatePersistOperatorWithSinkPartitions(InputPortMeta persistInputPort, OperatorMeta persistOperatorMeta, StreamCodecWrapperForPersistance<?> persistCodec, InputPortMeta sinkPortMeta)
{
Collection<PTOperator> ptOperators = getOperators(sinkPortMeta.getOperatorWrapper());
Collection<PartitionKeys> partitionKeysList = new ArrayList<PartitionKeys>();
@@ -437,7 +438,7 @@ public class PhysicalPlan implements Serializable
for (OperatorMeta n : dag.getAllOperators()) {
for (StreamMeta s : n.getOutputStreams().values()) {
if (s.getPersistOperator() != null) {
- Map<InputPortMeta, StreamCodec<Object>> inputStreamCodecs = new HashMap<InputPortMeta, StreamCodec<Object>>();
+ Map<InputPortMeta, StreamCodec<?>> inputStreamCodecs = new HashMap<>();
// Logging is enabled for the stream
for (InputPortMeta portMeta : s.getSinksToPersist()) {
InputPort<?> port = portMeta.getPortObject();
@@ -452,7 +453,7 @@ public class PhysicalPlan implements Serializable
}
}
if (!alreadyAdded) {
- inputStreamCodecs.put(portMeta, (StreamCodec<Object>) inputStreamCodec);
+ inputStreamCodecs.put(portMeta, inputStreamCodec);
}
}
}
@@ -466,8 +467,9 @@ public class PhysicalPlan implements Serializable
// Create Wrapper codec for Stream persistence using all unique
// stream codecs
// Logger should write merged or union of all input stream codecs
- StreamCodec<Object> specifiedCodecForLogger = (s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ? (StreamCodec<Object>) s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) : (StreamCodec<Object>) s.getPersistOperatorInputPort().getPortObject().getStreamCodec();
- StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance<Object>(inputStreamCodecs, specifiedCodecForLogger);
+ StreamCodec<?> specifiedCodecForLogger = (s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) != null) ? s.getPersistOperatorInputPort().getValue(PortContext.STREAM_CODEC) : s.getPersistOperatorInputPort().getPortObject().getStreamCodec();
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ StreamCodecWrapperForPersistance<Object> codec = new StreamCodecWrapperForPersistance(inputStreamCodecs, specifiedCodecForLogger);
streamMetaToCodecMap.put(s, codec);
}
}