You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:15 UTC

[10/34] incubator-flink git commit: [streaming] Added abstract class ExtractionAwareDeltaFunction to enable easy extraction of data in pre-defined delta functions.

[streaming] Added abstract class ExtractionAwareDeltaFunction to enable easy extraction of data in pre-defined delta functions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b2636247
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b2636247
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b2636247

Branch: refs/heads/master
Commit: b2636247616fe152f85b3afeb03da3fa15689b09
Parents: 997d3e1
Author: Jonas Traub (powibol) <jo...@s-traub.com>
Authored: Mon Oct 27 14:02:21 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:45:08 2014 +0100

----------------------------------------------------------------------
 .../ExtractionAwareDeltaFunction.java           | 90 ++++++++++++++++++++
 1 file changed, 90 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b2636247/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
new file mode 100644
index 0000000..81d01a4
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/deltafunction/ExtractionAwareDeltaFunction.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.deltafunction;
+
+import org.apache.flink.streaming.api.windowing.extractor.Extractor;
+
+/**
+ * Extend this abstract class to implement a delta function which is aware of
+ * extracting the data on which the delta is calculated from a more complex data
+ * structure. For example in case you want to be able to run a delta only on one
+ * field of a Tuple type or only on some fields from an array.
+ * 
+ * @param <DATA>
+ *            The input data type. The input of this type will be passed to the
+ *            extractor which will transform into a TO-object. The delta
+ *            function then runs on this TO-object.
+ * @param <TO>
+ *            The type on which the delta function runs. (The type of the delta
+ *            function)
+ */
+public abstract class ExtractionAwareDeltaFunction<DATA, TO> implements DeltaFunction<DATA> {
+
+	/**
+	 * Generated Version ID
+	 */
+	private static final long serialVersionUID = 6927486219702689554L;
+	private Extractor<DATA, TO> converter;
+
+	public ExtractionAwareDeltaFunction(Extractor<DATA, TO> converter) {
+		this.converter = converter;
+	}
+
+	/**
+	 * This method takes the two data point and runs the set extractor on it.
+	 * The delta function implemented at {@link getNestedDelta} is then called
+	 * with the extracted data. In case no extractor is set the input data gets
+	 * passes to {@link getNestedDelta} as-is. The return value is just
+	 * forwarded from {@link getNestedDelta}.
+	 * 
+	 * @param oldDataPoint
+	 *            the older data point as raw data (before extraction).
+	 * @param newDataPoint
+	 *            the new data point as raw data (before extraction).
+	 * @return the delta between the two points.
+	 */
+	@SuppressWarnings("unchecked")
+	@Override
+	public double getDelta(DATA oldDataPoint, DATA newDataPoint) {
+		if (converter == null) {
+			// In case no conversion/extraction is required, we can cast DATA to
+			// TO
+			// => Therefore, "unchecked" warning is suppressed for this method.
+			return getNestedDelta((TO) oldDataPoint, (TO) newDataPoint);
+		} else {
+			return getNestedDelta(converter.extract(oldDataPoint), converter.extract(newDataPoint));
+		}
+
+	}
+
+	/**
+	 * This method is exactly the same as
+	 * {@link DeltaFunction#getDelta(Object, Object)} except that it gets the
+	 * result of the previously done extractions as input. Therefore, this
+	 * method only does the actual calculation of the delta but no data
+	 * extraction or conversion.
+	 * 
+	 * @param oldDataPoint
+	 *            the older data point.
+	 * @param newDataPoint
+	 *            the new data point.
+	 * @return the delta between the two points.
+	 */
+	public abstract double getNestedDelta(TO oldDataPoint, TO newDataPoint);
+
+}