You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mj...@apache.org on 2015/10/06 13:31:37 UTC
[12/15] flink git commit: [Storm Compatibility] Maven module
restucturing and cleanup - removed storm-parent;
renamed storm-core and storm-examples - updated internal Java package
structure * renamed package "stormcompatibility" to "storm" *
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
deleted file mode 100644
index 30227b8..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/api/SplitStreamTypeKeySelector.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.api;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector;
-
-/**
- * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via
- * {@link FlinkStormStreamSelector} from a Spout or Bolt that declares multiple output streams.
- *
- * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular
- * {@link ArrayKeySelector} on it.
- */
-public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<Tuple>, Tuple> {
- private static final long serialVersionUID = 4672434660037669254L;
-
- private final ArrayKeySelector<Tuple> selector;
-
- public SplitStreamTypeKeySelector(int... fields) {
- this.selector = new KeySelectorUtil.ArrayKeySelector<Tuple>(fields);
- }
-
- @Override
- public Tuple getKey(SplitStreamType<Tuple> value) throws Exception {
- return selector.getKey(value.value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
deleted file mode 100644
index 114fa7c..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FiniteStormSpout.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import org.apache.flink.stormcompatibility.wrappers.FiniteStormSpoutWrapper;
-
-import backtype.storm.topology.IRichSpout;
-
-/**
- * This interface represents a Storm spout that emits a finite number of records. Common Storm
- * spouts emit infinite streams by default. To change this behaviour and take advantage of
- * Flink's finite-source capabilities, the spout should implement this interface. To wrap
- * {@link FiniteStormSpout} separately, use {@link FiniteStormSpoutWrapper}.
- */
-public interface FiniteStormSpout extends IRichSpout {
-
- /**
- * When returns true, the spout has reached the end of the stream.
- *
- * @return true, if the spout's stream reached its end, false otherwise
- */
- public boolean reachedEnd();
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
deleted file mode 100644
index 3eee8d6..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkOutputFieldsDeclarer.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a
- * {@link backtype.storm.topology.IRichSpout spout} or {@link backtype.storm.topology.IRichBolt bolt}.<br />
- * <br />
- * <strong>CAUTION: Flink does not support direct emit.</strong>
- */
-public final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
-
- /** The declared output streams and schemas. */
- public final HashMap<String, Fields> outputStreams = new HashMap<String, Fields>();
-
- @Override
- public void declare(final Fields fields) {
- this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
- }
-
- /**
- * {@inheritDoc}
- * <p/>
- * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
- *
- * @throws UnsupportedOperationException
- * if {@code direct} is {@code true}
- */
- @Override
- public void declare(final boolean direct, final Fields fields) {
- this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
- }
-
- @Override
- public void declareStream(final String streamId, final Fields fields) {
- this.declareStream(streamId, false, fields);
- }
-
- /**
- * {@inheritDoc}
- * <p/>
- * Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
- *
- * @throws UnsupportedOperationException
- * if {@code direct} is {@code true}
- */
- @Override
- public void declareStream(final String streamId, final boolean direct, final Fields fields) {
- if (direct) {
- throw new UnsupportedOperationException("Direct emit is not supported by Flink");
- }
-
- this.outputStreams.put(streamId, fields);
- }
-
- /**
- * Returns {@link TypeInformation} for the declared output schema for a specific stream.
- *
- * @param streamId
- * A stream ID.
- *
- * @return output type information for the declared output schema of the specified stream; or {@code null} if
- * {@code streamId == null}
- *
- * @throws IllegalArgumentException
- * If no output schema was declared for the specified stream or if more then 25 attributes got declared.
- */
- public TypeInformation<?> getOutputType(final String streamId) throws IllegalArgumentException {
- if (streamId == null) {
- return null;
- }
-
- Fields outputSchema = this.outputStreams.get(streamId);
- if (outputSchema == null) {
- throw new IllegalArgumentException("Stream with ID '" + streamId
- + "' was not declared.");
- }
-
- Tuple t;
- final int numberOfAttributes = outputSchema.size();
-
- if (numberOfAttributes == 1) {
- return TypeExtractor.getForClass(Object.class);
- } else if (numberOfAttributes <= 25) {
- try {
- t = Tuple.getTupleClass(numberOfAttributes).newInstance();
- } catch (final InstantiationException e) {
- throw new RuntimeException(e);
- } catch (final IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- } else {
- throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes");
- }
-
- // TODO: declare only key fields as DefaultComparable
- for (int i = 0; i < numberOfAttributes; ++i) {
- t.setField(new DefaultComparable(), i);
- }
-
- return TypeExtractor.getForObject(t);
- }
-
- /**
- * {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link
- * TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable,
- * Flink cannot use them and will throw an exception.
- */
- private static class DefaultComparable implements Comparable<DefaultComparable> {
-
- public DefaultComparable() {
- }
-
- @Override
- public int compareTo(final DefaultComparable o) {
- return 0;
- }
- }
-
- /**
- * Computes the indexes within the declared output schema of the specified stream, for a list of given
- * field-grouping attributes.
- *
- * @param streamId
- * A stream ID.
- * @param groupingFields
- * The names of the key fields.
- *
- * @return array of {@code int}s that contains the index within the output schema for each attribute in the given
- * list
- */
- public int[] getGroupingFieldIndexes(final String streamId, final List<String> groupingFields) {
- final int[] fieldIndexes = new int[groupingFields.size()];
-
- for (int i = 0; i < fieldIndexes.length; ++i) {
- fieldIndexes[i] = this.outputStreams.get(streamId).fieldIndex(groupingFields.get(i));
- }
-
- return fieldIndexes;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
deleted file mode 100644
index 7e60a87..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkStormStreamSelector.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.flink.stormcompatibility.api.FlinkTopologyBuilder;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-
-/**
- * Used by {@link FlinkTopologyBuilder} to split multiple declared output streams within Flink.
- */
-public final class FlinkStormStreamSelector<T> implements OutputSelector<SplitStreamType<T>> {
- private static final long serialVersionUID = 2553423379715401023L;
-
- /** internal cache to avoid short living ArrayList objects. */
- private final HashMap<String, List<String>> streams = new HashMap<String, List<String>>();
-
- @Override
- public Iterable<String> select(SplitStreamType<T> value) {
- String sid = value.streamId;
- List<String> streamId = this.streams.get(sid);
- if (streamId == null) {
- streamId = new ArrayList<String>(1);
- streamId.add(sid);
- this.streams.put(sid, streamId);
- }
- return streamId;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
deleted file mode 100644
index 14af830..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/FlinkTopologyContext.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.util;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.hooks.ITaskHook;
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.state.ISubscribedState;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Fields;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import clojure.lang.Atom;
-
-/**
- * {@link FlinkTopologyContext} is a {@link TopologyContext} that overwrites certain method that are not applicable when
- * a Storm topology is executed within Flink.
- */
-public final class FlinkTopologyContext extends TopologyContext {
-
- /**
- * Instantiates a new {@link FlinkTopologyContext} for a given Storm topology. The context object is instantiated
- * for each parallel task
- */
- public FlinkTopologyContext(final StormTopology topology, @SuppressWarnings("rawtypes") final Map stormConf,
- final Map<Integer, String> taskToComponent, final Map<String, List<Integer>> componentToSortedTasks,
- final Map<String, Map<String, Fields>> componentToStreamToFields, final String stormId, final String codeDir,
- final String pidDir, final Integer taskId, final Integer workerPort, final List<Integer> workerTasks,
- final Map<String, Object> defaultResources, final Map<String, Object> userResources,
- final Map<String, Object> executorData, @SuppressWarnings("rawtypes") final Map registeredMetrics,
- final Atom openOrPrepareWasCalled) {
- super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId,
- codeDir, pidDir, taskId, workerPort, workerTasks, defaultResources, userResources, executorData,
- registeredMetrics, openOrPrepareWasCalled);
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @Override
- public void addTaskHook(final ITaskHook hook) {
- throw new UnsupportedOperationException("Task hooks are not supported by Flink");
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @Override
- public Collection<ITaskHook> getHooks() {
- throw new UnsupportedOperationException("Task hooks are not supported by Flink");
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @Override
- public IMetric getRegisteredMetricByName(final String name) {
- throw new UnsupportedOperationException("Metrics are not supported by Flink");
-
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @SuppressWarnings("rawtypes")
- @Override
- public CombinedMetric registerMetric(final String name, final ICombiner combiner, final int timeBucketSizeInSecs) {
- throw new UnsupportedOperationException("Metrics are not supported by Flink");
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @SuppressWarnings("rawtypes")
- @Override
- public ReducedMetric registerMetric(final String name, final IReducer combiner, final int timeBucketSizeInSecs) {
- throw new UnsupportedOperationException("Metrics are not supported by Flink");
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @SuppressWarnings("unchecked")
- @Override
- public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) {
- throw new UnsupportedOperationException("Metrics are not supported by Flink");
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @Override
- public <T extends ISubscribedState> T setAllSubscribedState(final T obj) {
- throw new UnsupportedOperationException("Not supported by Flink");
-
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @Override
- public <T extends ISubscribedState> T setSubscribedState(final String componentId, final T obj) {
- throw new UnsupportedOperationException("Not supported by Flink");
- }
-
- /**
- * Not supported by Flink.
- *
- * @throws UnsupportedOperationException
- * at every invocation
- */
- @Override
- public <T extends ISubscribedState> T setSubscribedState(final String componentId, final String streamId, final T
- obj) {
- throw new UnsupportedOperationException("Not supported by Flink");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
deleted file mode 100644
index 9cb44ec..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamMapper.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.SplitStream;
-
-/**
- * Strips {@link SplitStreamType}{@code <T>} away, ie, extracts the wrapped record of type {@code T}. Can be used to get
- * a "clean" stream from a Spout/Bolt that declared multiple output streams (after the streams got separated using
- * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} and
- * {@link SplitStream#select(String...) .select(...)}).
- *
- * @param <T>
- */
-public class SplitStreamMapper<T> implements MapFunction<SplitStreamType<T>, T> {
- private static final long serialVersionUID = 3550359150160908564L;
-
- @Override
- public T map(SplitStreamType<T> value) throws Exception {
- return value.value;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
deleted file mode 100644
index 9c7e477..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamType.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-/**
- * Used by {@link org.apache.flink.stormcompatibility.wrappers.AbstractStormCollector AbstractStormCollector} to wrap
- * output tuples if multiple output streams are declared. For this case, the Flink output data stream must be split via
- * {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector) .split(...)} using
- * {@link FlinkStormStreamSelector}.
- */
-public class SplitStreamType<T> {
-
- /** The stream ID this tuple belongs to. */
- public String streamId;
- /** The actual data value. */
- public T value;
-
- @Override
- public String toString() {
- return "<sid:" + this.streamId + ",v:" + this.value + ">";
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- SplitStreamType<?> other = (SplitStreamType<?>) o;
-
- return this.streamId.equals(other.streamId) && this.value.equals(other.value);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
deleted file mode 100644
index 200f772..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/SplitStreamTypeKeySelector.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.apache.flink.streaming.util.keys.KeySelectorUtil.ArrayKeySelector;
-
-/**
- * {@link SplitStreamTypeKeySelector} is a specific grouping key selector for streams that are selected via
- * {@link FlinkStormStreamSelector} from a Spout or Bolt that declares multiple output streams.
- *
- * It extracts the wrapped {@link Tuple} type from the {@link SplitStreamType} tuples and applies a regular
- * {@link ArrayKeySelector} on it.
- */
-public class SplitStreamTypeKeySelector implements KeySelector<SplitStreamType<Tuple>, Tuple> {
- private static final long serialVersionUID = 4672434660037669254L;
-
- private final ArrayKeySelector<Tuple> selector;
-
- public SplitStreamTypeKeySelector(int... fields) {
- this.selector = new KeySelectorUtil.ArrayKeySelector<Tuple>(fields);
- }
-
- @Override
- public Tuple getKey(SplitStreamType<Tuple> value) throws Exception {
- return selector.getKey(value.value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
deleted file mode 100644
index 6726ae8..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/util/StormConfig.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.util;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
-
-import backtype.storm.Config;
-
-/**
- * {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config}
- * object) for embedded Spouts and Bolts.
- */
-@SuppressWarnings("rawtypes")
-public final class StormConfig extends GlobalJobParameters implements Map {
- private static final long serialVersionUID = 8019519109673698490L;
-
- /** Contains the actual configuration that is provided to Spouts and Bolts. */
- private final Map config = new HashMap();
-
- /**
- * Creates an empty configuration.
- */
- public StormConfig() {
- }
-
- /**
- * Creates an configuration with initial values provided by the given {@code Map}.
- *
- * @param config
- * Initial values for this configuration.
- */
- @SuppressWarnings("unchecked")
- public StormConfig(Map config) {
- this.config.putAll(config);
- }
-
-
- @Override
- public int size() {
- return this.config.size();
- }
-
- @Override
- public boolean isEmpty() {
- return this.config.isEmpty();
- }
-
- @Override
- public boolean containsKey(Object key) {
- return this.config.containsKey(key);
- }
-
- @Override
- public boolean containsValue(Object value) {
- return this.config.containsValue(value);
- }
-
- @Override
- public Object get(Object key) {
- return this.config.get(key);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Object put(Object key, Object value) {
- return this.config.put(key, value);
- }
-
- @Override
- public Object remove(Object key) {
- return this.config.remove(key);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void putAll(Map m) {
- this.config.putAll(m);
- }
-
- @Override
- public void clear() {
- this.config.clear();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Set<Object> keySet() {
- return this.config.keySet();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Collection<Object> values() {
- return this.config.values();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Set<java.util.Map.Entry<Object, Object>> entrySet() {
- return this.config.entrySet();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
deleted file mode 100644
index 7b35a64..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormCollector.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.wrappers;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map.Entry;
-
-/**
- * A {@link AbstractStormCollector} transforms Storm tuples to Flink tuples.
- */
-abstract class AbstractStormCollector<OUT> {
-
- /** Flink output tuple of concrete type {@link Tuple0} to {@link Tuple25} per output stream. */
- protected final HashMap<String, Tuple> outputTuple = new HashMap<String, Tuple>();
- /** Flink split tuple. Used, if multiple output streams are declared. */
- private final SplitStreamType<Object> splitTuple = new SplitStreamType<Object>();
- /**
- * The number of attributes of the output tuples per stream. (Determines the concrete type of {@link #outputTuple}).
- * If {@link #numberOfAttributes} is zero, {@link #outputTuple} is not used and "raw" data type is used.
- */
- protected final HashMap<String, Integer> numberOfAttributes;
- /** Indicates of multiple output stream are declared and thus {@link SplitStreamType} must be used as output. */
- private final boolean split;
- /** Is set to {@code true} each time a tuple is emitted. */
- boolean tupleEmitted = false;
-
- /**
- * Instantiates a new {@link AbstractStormCollector} that emits Flink tuples via {@link #doEmit(Object)}. If the
- * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is
- * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
- *
- * @param numberOfAttributes
- * The number of attributes of the emitted tuples per output stream.
- * @throws UnsupportedOperationException
- * if the specified number of attributes is greater than 25
- */
- public AbstractStormCollector(final HashMap<String, Integer> numberOfAttributes)
- throws UnsupportedOperationException {
- assert (numberOfAttributes != null);
-
- this.numberOfAttributes = numberOfAttributes;
- this.split = this.numberOfAttributes.size() > 1;
-
- for (Entry<String, Integer> outputStream : numberOfAttributes.entrySet()) {
- final int numAtt = outputStream.getValue();
- assert (numAtt >= -1);
-
- if (numAtt > 25) {
- throw new UnsupportedOperationException(
- "Flink cannot handle more then 25 attributes, but " + numAtt
- + " are declared for stream '" + outputStream.getKey()
- + "' by the given bolt");
- } else if (numAtt >= 0) {
- try {
- this.outputTuple.put(outputStream.getKey(),
- org.apache.flink.api.java.tuple.Tuple.getTupleClass(numAtt)
- .newInstance());
- } catch (final InstantiationException e) {
- throw new RuntimeException(e);
- } catch (final IllegalAccessException e) {
- throw new RuntimeException(e);
- }
-
- }
- }
- }
-
- /**
- * Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via {@link #doEmit(Object)}
- * to the specified output stream.
- *
- * @param The
- * The output stream id.
- * @param tuple
- * The Storm tuple to be emitted.
- * @return the return value of {@link #doEmit(Object)}
- */
- @SuppressWarnings("unchecked")
- protected final List<Integer> tansformAndEmit(final String streamId, final List<Object> tuple) {
- List<Integer> taskIds;
-
- final int numAtt = this.numberOfAttributes.get(streamId);
- if (numAtt > -1) {
- assert (tuple.size() == numAtt);
- Tuple out = this.outputTuple.get(streamId);
- for (int i = 0; i < numAtt; ++i) {
- out.setField(tuple.get(i), i);
- }
- if (this.split) {
- this.splitTuple.streamId = streamId;
- this.splitTuple.value = out;
-
- taskIds = doEmit((OUT) this.splitTuple);
- } else {
- taskIds = doEmit((OUT) out);
- }
-
- } else {
- assert (tuple.size() == 1);
- if (split) {
- this.splitTuple.streamId = streamId;
- this.splitTuple.value = tuple.get(0);
-
- taskIds = doEmit((OUT) this.splitTuple);
- } else {
- taskIds = doEmit((OUT) tuple.get(0));
- }
- }
- this.tupleEmitted = true;
-
- return taskIds;
- }
-
- /**
- * Emits a Flink tuple.
- *
- * @param flinkTuple
- * The tuple to be emitted.
- * @return the IDs of the tasks this tuple was sent to
- */
- protected abstract List<Integer> doEmit(OUT flinkTuple);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
deleted file mode 100644
index ccd29bb..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/AbstractStormSpoutWrapper.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Collection;
-import java.util.HashMap;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.topology.IRichSpout;
-
-import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
-
-/**
- * A {@link AbstractStormSpoutWrapper} wraps an {@link IRichSpout} in order to execute the Storm bolt within a Flink
- * Streaming program. It takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see
- * {@link StormSpoutCollector} for supported types).<br />
- * <br />
- * <strong>CAUTION: currently, only simple spouts are supported! (ie, spouts that do not use the Storm configuration
- * <code>Map</code> or <code>TopologyContext</code> that is provided by the spouts's <code>prepare(..)</code> method.
- * Furthermore, ack and fail back calls as well as tuple IDs are not supported so far.</strong>
- */
-public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
- private static final long serialVersionUID = 4993283609095408765L;
-
- /** Number of attributes of the bolt's output tuples per stream. */
- private final HashMap<String, Integer> numberOfAttributes;
- /** The wrapped Storm {@link IRichSpout spout}. */
- protected final IRichSpout spout;
- /** The wrapper of the given Flink collector. */
- protected StormSpoutCollector<OUT> collector;
- /** Indicates, if the source is still running or was canceled. */
- protected volatile boolean isRunning = true;
- /** The original Storm topology. */
- protected StormTopology stormTopology;
-
- /**
- * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
- * that it can be used within a Flink streaming program. The output type will be one of {@link Tuple0} to
- * {@link Tuple25} depending on the spout's declared number of attributes.
- *
- * @param spout
- * The Storm {@link IRichSpout spout} to be used.
- * @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [0;25].
- */
- public AbstractStormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
- this(spout, null);
- }
-
- /**
- * Instantiates a new {@link AbstractStormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such
- * that it can be used within a Flink streaming program. The output type can be any type if parameter
- * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
- * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared
- * number of attributes.
- *
- * @param spout
- * The Storm {@link IRichSpout spout} to be used.
- * @param rawOutputs
- * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
- * @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [0;25].
- */
- public AbstractStormSpoutWrapper(final IRichSpout spout,
- final Collection<String> rawOutputs)
- throws IllegalArgumentException {
- this.spout = spout;
- this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(spout, rawOutputs);
- }
-
- /**
- * Sets the original Storm topology.
- *
- * @param stormTopology
- * The original Storm topology.
- */
- public void setStormTopology(StormTopology stormTopology) {
- this.stormTopology = stormTopology;
- }
-
- @Override
- public final void run(final SourceContext<OUT> ctx) throws Exception {
- this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
-
- GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
- StormConfig stormConfig = new StormConfig();
-
- if (config != null) {
- if (config instanceof StormConfig) {
- stormConfig = (StormConfig) config;
- } else {
- stormConfig.putAll(config.toMap());
- }
- }
-
- this.spout.open(stormConfig,
- StormWrapperSetupHelper.createTopologyContext(
- (StreamingRuntimeContext) super.getRuntimeContext(),
- this.spout,
- this.stormTopology,
- null),
- new SpoutOutputCollector(this.collector));
- this.spout.activate();
- this.execute();
- }
-
- /**
- * Needs to be implemented to call the given Spout's {@link IRichSpout#nextTuple() nextTuple()} method. This method
- * might use a {@code while(true)}-loop to emit an infinite number of tuples.
- */
- protected abstract void execute();
-
- /**
- * {@inheritDoc}
- * <p/>
- * Sets the {@link #isRunning} flag to {@code false}.
- */
- @Override
- public void cancel() {
- this.isRunning = false;
- }
-
- @Override
- public void close() throws Exception {
- this.spout.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
deleted file mode 100644
index f499ecc..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/FiniteStormSpoutWrapper.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Collection;
-
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.stormcompatibility.util.FiniteStormSpout;
-
-import com.google.common.collect.Sets;
-
-/**
- * A {@link FiniteStormSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls the wrapped
- * {@link FiniteStormSpout}'s {@link FiniteStormSpout#nextTuple()} method until {@link
- * FiniteStormSpout#reachedEnd()} is true.
- */
-public class FiniteStormSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
- private static final long serialVersionUID = -218340336648247605L;
-
- private FiniteStormSpout finiteSpout;
-
- /**
- * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link FiniteStormSpout spout} such
- * that it can be used within a Flink streaming program. The output type will be one of {@link Tuple0} to
- * {@link Tuple25} depending on the spout's declared number of attributes.
- *
- * @param spout
- * The Storm {@link FiniteStormSpout spout} to be used.
- * @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [0;25].
- */
- public FiniteStormSpoutWrapper(FiniteStormSpout spout)
- throws IllegalArgumentException {
- super(spout);
- this.finiteSpout = spout;
- }
-
- /**
- * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link FiniteStormSpout spout} such
- * that it can be used within a Flink streaming program. The output type can be any type if parameter
- * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
- * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared
- * number of attributes.
- *
- * @param spout
- * The Storm {@link FiniteStormSpout spout} to be used.
- * @param rawOutputs
- * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
- * @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [0;25].
- */
- public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final String[] rawOutputs)
- throws IllegalArgumentException {
- this(spout, Sets.newHashSet(rawOutputs));
- }
-
- /**
- * Instantiates a new {@link FiniteStormSpoutWrapper} that wraps the given Storm {@link FiniteStormSpout spout} such
- * that it can be used within a Flink streaming program. The output type can be any type if parameter
- * {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is
- * {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared
- * number of attributes.
- *
- * @param spout
- * The Storm {@link FiniteStormSpout spout} to be used.
- * @param rawOutputs
- * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
- * @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [0;25].
- */
- public FiniteStormSpoutWrapper(final FiniteStormSpout spout, final Collection<String> rawOutputs)
- throws IllegalArgumentException {
- super(spout, rawOutputs);
- this.finiteSpout = spout;
- }
-
- /**
- * Calls the {@link FiniteStormSpout#nextTuple()} method until {@link FiniteStormSpout#reachedEnd()} is true or
- * {@link FiniteStormSpout#cancel()} is called.
- */
- @Override
- protected void execute() {
- while (super.isRunning && !finiteSpout.reachedEnd()) {
- finiteSpout.nextTuple();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
deleted file mode 100644
index 3cd27d4..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/SetupOutputFieldsDeclarer.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.HashMap;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-/**
- * {@link SetupOutputFieldsDeclarer} is used by {@link StormWrapperSetupHelper} to determine the output streams and
- * number of attributes declared by the wrapped spout's or bolt's {@code declare(...)}/{@code declareStream(...)}
- * method.
- */
-class SetupOutputFieldsDeclarer implements OutputFieldsDeclarer {
-
- /** The number of attributes for each declared stream by the wrapped operator. */
- HashMap<String, Integer> outputSchemas = new HashMap<String, Integer>();
-
- @Override
- public void declare(final Fields fields) {
- this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
- }
-
- @Override
- public void declare(final boolean direct, final Fields fields) {
- this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
- }
-
- @Override
- public void declareStream(final String streamId, final Fields fields) {
- this.declareStream(streamId, false, fields);
- }
-
- @Override
- public void declareStream(final String streamId, final boolean direct, final Fields fields) {
- if (streamId == null) {
- throw new IllegalArgumentException("Stream ID cannot be null.");
- }
- if (direct) {
- throw new UnsupportedOperationException("Direct emit is not supported by Flink");
- }
-
- this.outputSchemas.put(streamId, fields.size());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
deleted file mode 100644
index e810214..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltCollector.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.tuple.Tuple;
-
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.util.Collector;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * A {@link StormBoltCollector} is used by {@link StormBoltWrapper} to provided an Storm compatible
- * output collector to the wrapped bolt. It transforms the emitted Storm tuples into Flink tuples
- * and emits them via the provide {@link Output} object.
- */
-class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector {
-
- /** The Flink output Collector */
- private final Collector<OUT> flinkOutput;
-
- /**
- * Instantiates a new {@link StormBoltCollector} that emits Flink tuples to the given Flink output object. If the
- * number of attributes is negative, any output type is supported (ie, raw type). If the number of attributes is
- * between 0 and 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
- *
- * @param numberOfAttributes
- * The number of attributes of the emitted tuples per output stream.
- * @param flinkOutput
- * The Flink output object to be used.
- * @throws UnsupportedOperationException
- * if the specified number of attributes is greater than 25
- */
- public StormBoltCollector(final HashMap<String, Integer> numberOfAttributes,
- final Collector<OUT> flinkOutput) throws UnsupportedOperationException {
- super(numberOfAttributes);
- assert (flinkOutput != null);
- this.flinkOutput = flinkOutput;
- }
-
- @Override
- protected List<Integer> doEmit(final OUT flinkTuple) {
- this.flinkOutput.collect(flinkTuple);
- // TODO
- return null;
- }
-
- @Override
- public void reportError(final Throwable error) {
- // not sure, if Flink can support this
- throw new UnsupportedOperationException("Not implemented yet");
- }
-
- @Override
- public List<Integer> emit(final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
- return this.tansformAndEmit(streamId, tuple);
- }
-
- @Override
- public void emitDirect(final int taskId, final String streamId, final Collection<Tuple> anchors, final List<Object> tuple) {
- throw new UnsupportedOperationException("Direct emit is not supported by Flink");
- }
-
- @Override
- public void ack(final Tuple input) {
- throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink");
- }
-
- @Override
- public void fail(final Tuple input) {
- throw new UnsupportedOperationException("Currently, acking/failing is not supported by Flink");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
deleted file mode 100644
index 715d6df..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormBoltWrapper.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Collection;
-import java.util.HashMap;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.tuple.Fields;
-
-import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.stormcompatibility.util.SplitStreamType;
-import org.apache.flink.stormcompatibility.util.StormConfig;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import com.google.common.collect.Sets;
-
-/**
- * A {@link StormBoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming
- * program. It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the
- * bolt can process. Furthermore, it takes the bolt's output tuples and transforms them into Flink tuples of type
- * {@code OUT} (see {@link AbstractStormCollector} for supported types).<br />
- * <br />
- * <strong>CAUTION: currently, only simple bolts are supported! (ie, bolts that do not use the Storm configuration
- * <code>Map</code> or <code>TopologyContext</code> that is provided by the bolt's <code>open(..)</code> method.
- * Furthermore, acking and failing of tuples as well as accessing tuple attributes by field names is not supported so
- * far.</strong>
- */
-public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
- private static final long serialVersionUID = -4788589118464155835L;
-
- /** The wrapped Storm {@link IRichBolt bolt}. */
- private final IRichBolt bolt;
- /** Number of attributes of the bolt's output tuples per stream. */
- private final HashMap<String, Integer> numberOfAttributes;
- /** The schema (ie, ordered field names) of the input stream. */
- private final Fields inputSchema;
- /** The original Storm topology. */
- protected StormTopology stormTopology;
-
- /**
- * We have to use this because Operators must output
- * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}.
- */
- private TimestampedCollector<OUT> flinkCollector;
-
- /**
- * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
- * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
- * for POJO input types. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's
- * declared number of attributes.
- *
- * @param bolt
- * The Storm {@link IRichBolt bolt} to be used.
- * @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [0;25].
- */
- public StormBoltWrapper(final IRichBolt bolt) throws IllegalArgumentException {
- this(bolt, null, (Collection<String>) null);
- }
-
- /**
- * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
- * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
- * {@link Tuple0} to {@link Tuple25}. The output type will be one of {@link Tuple0} to {@link Tuple25} depending on
- * the bolt's declared number of attributes.
- *
- * @param bolt
- * The Storm {@link IRichBolt bolt} to be used.
- * @param inputSchema
- * The schema (ie, ordered field names) of the input stream.
- * @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [0;25].
- */
- public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema)
- throws IllegalArgumentException {
- this(bolt, inputSchema, (Collection<String>) null);
- }
-
- /**
- * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
- * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
- * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the
- * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one
- * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
- *
- * @param bolt
- * The Storm {@link IRichBolt bolt} to be used.
- * @param rawOutputs
- * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
- * @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [1;25].
- */
- public StormBoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
- throws IllegalArgumentException {
- this(bolt, null, Sets.newHashSet(rawOutputs));
- }
-
- /**
- * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
- * used within a Flink streaming program. As no input schema is defined, attribute-by-name access in only possible
- * for POJO input types. The output type can be any type if parameter {@code rawOutput} is {@code true} and the
- * bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one
- * of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
- *
- * @param bolt
- * The Storm {@link IRichBolt bolt} to be used.
- * @param rawOutputs
- * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
- * @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [1;25].
- */
- public StormBoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs)
- throws IllegalArgumentException {
- this(bolt, null, rawOutputs);
- }
-
- /**
- * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
- * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
- * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
- * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
- * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
- *
- * @param bolt
- * The Storm {@link IRichBolt bolt} to be used.
- * @param inputSchema
- * The schema (ie, ordered field names) of the input stream.
- * @param rawOutputs
- * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
- * @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [0;25].
- */
- public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema,
- final String[] rawOutputs) throws IllegalArgumentException {
- this(bolt, inputSchema, Sets.newHashSet(rawOutputs));
- }
-
- /**
- * Instantiates a new {@link StormBoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be
- * used within a Flink streaming program. The given input schema enable attribute-by-name access for input types
- * {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
- * and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
- * be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
- *
- * @param bolt
- * The Storm {@link IRichBolt bolt} to be used.
- * @param inputSchema
- * The schema (ie, ordered field names) of the input stream.
- * @param rawOutputs
- * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
- * @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [0;25].
- */
- public StormBoltWrapper(final IRichBolt bolt, final Fields inputSchema,
- final Collection<String> rawOutputs) throws IllegalArgumentException {
- this.bolt = bolt;
- this.inputSchema = inputSchema;
- this.numberOfAttributes = StormWrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs);
- }
-
- /**
- * Sets the original Storm topology.
- *
- * @param stormTopology
- * The original Storm topology.
- */
- public void setStormTopology(StormTopology stormTopology) {
- this.stormTopology = stormTopology;
- }
-
- @Override
- public void open(final Configuration parameters) throws Exception {
- super.open(parameters);
-
- final TopologyContext topologyContext = StormWrapperSetupHelper.createTopologyContext(
- super.runtimeContext, this.bolt, this.stormTopology, null);
- flinkCollector = new TimestampedCollector<OUT>(output);
- OutputCollector stormCollector = null;
-
- if (this.numberOfAttributes.size() > 0) {
- stormCollector = new OutputCollector(new StormBoltCollector<OUT>(
- this.numberOfAttributes, flinkCollector));
- }
-
- GlobalJobParameters config = super.executionConfig.getGlobalJobParameters();
- StormConfig stormConfig = new StormConfig();
-
- if (config != null) {
- if (config instanceof StormConfig) {
- stormConfig = (StormConfig) config;
- } else {
- stormConfig.putAll(config.toMap());
- }
- }
-
- this.bolt.prepare(stormConfig, topologyContext, stormCollector);
- }
-
- @Override
- public void dispose() {
- this.bolt.cleanup();
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void processElement(final StreamRecord<IN> element) throws Exception {
- flinkCollector.setTimestamp(element.getTimestamp());
- IN value = element.getValue();
- if (value instanceof SplitStreamType) {
- this.bolt.execute(new StormTuple<IN>(((SplitStreamType<IN>) value).value,
- inputSchema));
- } else {
- this.bolt.execute(new StormTuple<IN>(value, inputSchema));
- }
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- output.emitWatermark(mark);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
deleted file mode 100644
index 45eb56c..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormFiniteSpoutWrapper.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import java.util.Collection;
-
-import backtype.storm.topology.IRichSpout;
-
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple25;
-
-import com.google.common.collect.Sets;
-
-/**
- * A {@link StormFiniteSpoutWrapper} is an {@link AbstractStormSpoutWrapper} that calls {@link IRichSpout#nextTuple()
- * nextTuple()} for finite number of times before
- * {@link #run(org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext)} returns. The number of
- * {@code nextTuple()} calls can be specified as a certain number of invocations or can be undefined. In the undefined
- * case, the {@code run(...)} method return if no record was emitted to the output collector for the first time.
- */
-public class StormFiniteSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
- private static final long serialVersionUID = 3883246587044801286L;
-
- /** The number of {@link IRichSpout#nextTuple()} calls */
- private int numberOfInvocations;
-
- /**
- * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
- * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
- * output type will be one of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of
- * attributes.
- *
- * @param spout
- * The Storm {@link IRichSpout spout} to be used.
- * @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [0;25].
- */
- public StormFiniteSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
- this(spout, (Collection<String>) null, -1);
- }
-
- /**
- * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
- * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type will be one
- * of {@link Tuple0} to {@link Tuple25} depending on the spout's declared number of attributes.
- *
- * @param spout
- * The Storm {@link IRichSpout spout} to be used.
- * @param numberOfInvocations
- * The number of calls to {@link IRichSpout#nextTuple()}.
- * @throws IllegalArgumentException
- * If the number of declared output attributes is not with range [0;25].
- */
- public StormFiniteSpoutWrapper(final IRichSpout spout, final int numberOfInvocations)
- throws IllegalArgumentException {
- this(spout, (Collection<String>) null, numberOfInvocations);
- }
-
- /**
- * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
- * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
- * output type can be any type if parameter {@code rawOutput} is {@code true} and the spout's number of declared
- * output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to
- * {@link Tuple25} depending on the spout's declared number of attributes.
- *
- * @param spout
- * The Storm {@link IRichSpout spout} to be used.
- * @param rawOutputs
- * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
- * @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [0;25].
- */
- public StormFiniteSpoutWrapper(final IRichSpout spout, final String[] rawOutputs)
- throws IllegalArgumentException {
- this(spout, Sets.newHashSet(rawOutputs), -1);
- }
-
- /**
- * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
- * method of the given Storm {@link IRichSpout spout} as long as it emits records to the output collector. The
- * output type can be any type if parameter {@code rawOutput} is {@code true} and the spout's number of declared
- * output tuples is 1. If {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to
- * {@link Tuple25} depending on the spout's declared number of attributes.
- *
- * @param spout
- * The Storm {@link IRichSpout spout} to be used.
- * @param rawOutputs
- * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
- * @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [0;25].
- */
- public StormFiniteSpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs)
- throws IllegalArgumentException {
- this(spout, rawOutputs, -1);
- }
-
- /**
- * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
- * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type can be any
- * type if parameter {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If
- * {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on
- * the spout's declared number of attributes.
- *
- * @param spout
- * The Storm {@link IRichSpout spout} to be used.
- * @param rawOutputs
- * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
- * @param numberOfInvocations
- * The number of calls to {@link IRichSpout#nextTuple()}.
- * @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [0;25].
- */
- public StormFiniteSpoutWrapper(final IRichSpout spout, final String[] rawOutputs,
- final int numberOfInvocations) throws IllegalArgumentException {
- super(spout, Sets.newHashSet(rawOutputs));
- this.numberOfInvocations = numberOfInvocations;
- }
-
- /**
- * Instantiates a new {@link StormFiniteSpoutWrapper} that calls the {@link IRichSpout#nextTuple() nextTuple()}
- * method of the given Storm {@link IRichSpout spout} {@code numberOfInvocations} times. The output type can be any
- * type if parameter {@code rawOutput} is {@code true} and the spout's number of declared output tuples is 1. If
- * {@code rawOutput} is {@code false} the output type will be one of {@link Tuple0} to {@link Tuple25} depending on
- * the spout's declared number of attributes.
- *
- * @param spout
- * The Storm {@link IRichSpout spout} to be used.
- * @param rawOutputs
- * Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
- * of a raw type.
- * @param numberOfInvocations
- * The number of calls to {@link IRichSpout#nextTuple()}.
- * @throws IllegalArgumentException
- * If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
- * {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
- * [0;25].
- */
- public StormFiniteSpoutWrapper(final IRichSpout spout, final Collection<String> rawOutputs,
- final int numberOfInvocations) throws IllegalArgumentException {
- super(spout, rawOutputs);
- this.numberOfInvocations = numberOfInvocations;
- }
-
- /**
- * Calls {@link IRichSpout#nextTuple()} for the given number of times.
- */
- @Override
- protected void execute() {
- if (this.numberOfInvocations >= 0) {
- while ((--this.numberOfInvocations >= 0) && super.isRunning) {
- super.spout.nextTuple();
- }
- } else {
- do {
- super.collector.tupleEmitted = false;
- super.spout.nextTuple();
- } while (super.collector.tupleEmitted && super.isRunning);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4cb96708/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java b/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
deleted file mode 100644
index 5a20056..0000000
--- a/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-core/src/main/java/org/apache/flink/stormcompatibility/wrappers/StormSpoutCollector.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.stormcompatibility.wrappers;
-
-import backtype.storm.spout.ISpoutOutputCollector;
-
-import org.apache.flink.api.java.tuple.Tuple0;
-import org.apache.flink.api.java.tuple.Tuple25;
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-
-import java.util.HashMap;
-import java.util.List;
-
-/**
- * A {@link StormSpoutCollector} is used by {@link AbstractStormSpoutWrapper} to provided an Storm
- * compatible output collector to the wrapped spout. It transforms the emitted Storm tuples into
- * Flink tuples and emits them via the provide {@link SourceContext} object.
- */
-class StormSpoutCollector<OUT> extends AbstractStormCollector<OUT> implements ISpoutOutputCollector {
-
- /** The Flink source context object */
- private final SourceContext<OUT> flinkContext;
-
- /**
- * Instantiates a new {@link StormSpoutCollector} that emits Flink tuples to the given Flink source context. If the
- * number of attributes is specified as zero, any output type is supported. If the number of attributes is between 0
- * to 25, the output type is {@link Tuple0} to {@link Tuple25}, respectively.
- *
- * @param numberOfAttributes
- * The number of attributes of the emitted tuples.
- * @param flinkContext
- * The Flink source context to be used.
- * @throws UnsupportedOperationException
- * if the specified number of attributes is greater than 25
- */
- public StormSpoutCollector(final HashMap<String, Integer> numberOfAttributes,
- final SourceContext<OUT> flinkContext) throws UnsupportedOperationException {
- super(numberOfAttributes);
- assert (flinkContext != null);
- this.flinkContext = flinkContext;
- }
-
- @Override
- protected List<Integer> doEmit(final OUT flinkTuple) {
- this.flinkContext.collect(flinkTuple);
- // TODO
- return null;
- }
-
- @Override
- public void reportError(final Throwable error) {
- // not sure, if Flink can support this
- throw new UnsupportedOperationException("Not implemented yet");
- }
-
- @Override
- public List<Integer> emit(final String streamId, final List<Object> tuple, final Object messageId) {
- return this.tansformAndEmit(streamId, tuple);
- }
-
-
- @Override
- public void emitDirect(final int taskId, final String streamId, final List<Object> tuple, final Object messageId) {
- throw new UnsupportedOperationException("Direct emit is not supported by Flink");
- }
-
-}