You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/22 22:12:09 UTC

[2/9] flink git commit: [hotfix] Remove no longer used Generic State classes

[hotfix] Remove no longer used Generic State classes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/254a7007
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/254a7007
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/254a7007

Branch: refs/heads/master
Commit: 254a70072aa3bbd51277e99e0982c5e29908d684
Parents: 51c02ee
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jan 13 14:52:40 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jan 22 21:22:21 2017 +0100

----------------------------------------------------------------------
 .../runtime/state/GenericFoldingState.java      | 80 -------------------
 .../flink/runtime/state/GenericListState.java   | 84 --------------------
 .../runtime/state/GenericReducingState.java     | 83 -------------------
 3 files changed, 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/254a7007/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
deleted file mode 100644
index ee2d86d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.ValueState;
-
-/**
- * Generic implementation of {@link FoldingState} based on a wrapped {@link ValueState}.
- *
- * @param <N> The type of the namespace.
- * @param <T> The type of the values that can be folded into the state.
- * @param <ACC> The type of the value in the folding state.
- * @param <W> Generic type that extends both the underlying {@code ValueState} and {@code KvState}.
- */
-public class GenericFoldingState<N, T, ACC, W extends ValueState<ACC> & KvState<N>>
-	implements FoldingState<T, ACC>, KvState<N> {
-
-	private final W wrappedState;
-	private final FoldFunction<T, ACC> foldFunction;
-
-	/**
-	 * Creates a new {@code FoldingState} that wraps the given {@link ValueState}. The
-	 * {@code ValueState} must have the initial value of the fold as default value.
-	 *
-	 * @param wrappedState The wrapped {@code ValueState}
-	 * @param foldFunction The {@code FoldFunction} to use for folding values into the state
-	 */
-	@SuppressWarnings("unchecked")
-	public GenericFoldingState(ValueState<ACC> wrappedState, FoldFunction<T, ACC> foldFunction) {
-		if (!(wrappedState instanceof KvState)) {
-			throw new IllegalArgumentException("Wrapped state must be a KvState.");
-		}
-		this.wrappedState = (W) wrappedState;
-		this.foldFunction = foldFunction;
-	}
-
-	@Override
-	public void setCurrentNamespace(N namespace) {
-		wrappedState.setCurrentNamespace(namespace);
-	}
-
-	@Override
-	public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
-		return wrappedState.getSerializedValue(serializedKeyAndNamespace);
-	}
-
-	@Override
-	public ACC get() throws Exception {
-		return wrappedState.value();
-	}
-
-	@Override
-	public void add(T value) throws Exception {
-		ACC currentValue = wrappedState.value();
-		wrappedState.update(foldFunction.fold(currentValue, value));
-	}
-
-	@Override
-	public void clear() {
-		wrappedState.clear();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/254a7007/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
deleted file mode 100644
index ba81837..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ValueState;
-
-import java.util.ArrayList;
-
-/**
- * Generic implementation of {@link ListState} based on a wrapped {@link ValueState}.
- *
- * @param <N> The type of the namespace.
- * @param <T> The type of the values stored in this {@code ListState}.
- * @param <W> Generic type that extends both the underlying {@code ValueState} and {@code KvState}.
- */
-public class GenericListState<N, T, W extends ValueState<ArrayList<T>> & KvState<N>>
-	implements ListState<T>, KvState<N> {
-
-	private final W wrappedState;
-
-	/**
-	 * Creates a new {@code ListState} that wraps the given {@link ValueState}. The
-	 * {@code ValueState} must have a default value of {@code null}.
-	 *
-	 * @param wrappedState The wrapped {@code ValueState}
-	 */
-	@SuppressWarnings("unchecked")
-	public GenericListState(ValueState<ArrayList<T>> wrappedState) {
-		if (!(wrappedState instanceof KvState)) {
-			throw new IllegalArgumentException("Wrapped state must be a KvState.");
-		}
-		this.wrappedState = (W) wrappedState;
-	}
-
-	@Override
-	public void setCurrentNamespace(N namespace) {
-		wrappedState.setCurrentNamespace(namespace);
-	}
-
-	@Override
-	public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
-		return wrappedState.getSerializedValue(serializedKeyAndNamespace);
-	}
-
-	@Override
-	public Iterable<T> get() throws Exception {
-		return wrappedState.value();
-	}
-
-	@Override
-	public void add(T value) throws Exception {
-		ArrayList<T> currentValue = wrappedState.value();
-		if (currentValue == null) {
-			currentValue = new ArrayList<>();
-			currentValue.add(value);
-			wrappedState.update(currentValue);
-		} else {
-			currentValue.add(value);
-			wrappedState.update(currentValue);
-		}
-	}
-
-	@Override
-	public void clear() {
-		wrappedState.clear();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/254a7007/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
deleted file mode 100644
index 214231e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ValueState;
-
-/**
- * Generic implementation of {@link ReducingState} based on a wrapped {@link ValueState}.
- *
- * @param <N> The type of the namespace.
- * @param <T> The type of the values stored in this {@code ReducingState}.
- * @param <W> Generic type that extends both the underlying {@code ValueState} and {@code KvState}.
- */
-public class GenericReducingState<N, T, W extends ValueState<T> & KvState<N>>
-	implements ReducingState<T>, KvState<N> {
-
-	private final W wrappedState;
-	private final ReduceFunction<T> reduceFunction;
-
-	/**
-	 * Creates a new {@code ReducingState} that wraps the given {@link ValueState}. The
-	 * {@code ValueState} must have a default value of {@code null}.
-	 *
-	 * @param wrappedState The wrapped {@code ValueState}
-	 * @param reduceFunction The {@code ReduceFunction} to use for combining values.
-	 */
-	@SuppressWarnings("unchecked")
-	public GenericReducingState(ValueState<T> wrappedState, ReduceFunction<T> reduceFunction) {
-		if (!(wrappedState instanceof KvState)) {
-			throw new IllegalArgumentException("Wrapped state must be a KvState.");
-		}
-		this.wrappedState = (W) wrappedState;
-		this.reduceFunction = reduceFunction;
-	}
-
-	@Override
-	public void setCurrentNamespace(N namespace) {
-		wrappedState.setCurrentNamespace(namespace);
-	}
-
-	@Override
-	public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
-		return wrappedState.getSerializedValue(serializedKeyAndNamespace);
-	}
-
-	@Override
-	public T get() throws Exception {
-		return wrappedState.value();
-	}
-
-	@Override
-	public void add(T value) throws Exception {
-		T currentValue = wrappedState.value();
-		if (currentValue == null) {
-			wrappedState.update(value);
-		} else {
-			wrappedState.update(reduceFunction.reduce(currentValue, value));
-		}
-	}
-
-	@Override
-	public void clear() {
-		wrappedState.clear();
-	}
-}