You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2018/05/11 17:02:12 UTC

[GitHub] flink pull request #5807: [FLINK-8982][E2E Tests] Add test for known failure...

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5807#discussion_r187674099
  
    --- Diff: flink-end-to-end-tests/flink-queryable-state-test/src/main/java/org/apache/flink/streaming/tests/queryablestate/QsStateProducer.java ---
    @@ -0,0 +1,182 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.tests.queryablestate;
    +
    +import org.apache.flink.api.common.functions.RichFlatMapFunction;
    +import org.apache.flink.api.common.state.MapState;
    +import org.apache.flink.api.common.state.MapStateDescriptor;
    +import org.apache.flink.api.common.typeinfo.TypeHint;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.utils.ParameterTool;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.runtime.state.StateBackend;
    +import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    +import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    +import org.apache.flink.util.Collector;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
    +
    +import java.time.Duration;
    +import java.time.Instant;
    +import java.util.Random;
    +
    +/**
    + * Streaming application that creates an {@link Email} pojo with random ids and increasing
    + * timestamps and passes it to a stateful {@link org.apache.flink.api.common.functions.FlatMapFunction},
    + * where it is exposed as queryable state.
    + */
    +public class QsStateProducer {
    +
    +	public static final String QUERYABLE_STATE_NAME = "state";
    +	public static final String STATE_NAME = "state";
    +
    +	public static void main(final String[] args) throws Exception {
    +		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		ParameterTool tool = ParameterTool.fromArgs(args);
    +		String tmpPath = tool.getRequired("tmp-dir");
    +		String stateBackendType = tool.getRequired("state-backend");
    +
    +		StateBackend stateBackend;
    +		switch (stateBackendType) {
    +			case "rocksdb":
    +				stateBackend = new RocksDBStateBackend(tmpPath);
    +				break;
    +			case "fs":
    +				stateBackend = new FsStateBackend(tmpPath);
    +				break;
    +			case "memory":
    +				stateBackend = new MemoryStateBackend();
    +				break;
    +			default:
    +				throw new RuntimeException("Unsupported state backend " + stateBackendType);
    +		}
    +
    +		env.setStateBackend(stateBackend);
    +		env.enableCheckpointing(1000L);
    +		env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    +		env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0);
    +
    +		env.addSource(new EmailSource())
    +			.keyBy(new KeySelector<Email, String>() {
    +
    +				private static final long serialVersionUID = -1480525724620425363L;
    +
    +				@Override
    +				public String getKey(Email value) throws Exception {
    +					return "";
    +				}
    +			})
    +			.flatMap(new MyFlatMap());
    +
    +		env.execute();
    +	}
    +
    +	private static class EmailSource extends RichSourceFunction<Email> {
    +
    +		private static final long serialVersionUID = -7286937645300388040L;
    +
    +		private Random random;
    +		private volatile boolean isRunning = true;
    +
    +		@Override
    +		public void open(Configuration parameters) throws Exception {
    +			super.open(parameters);
    +			this.random = new Random();
    +		}
    +
    +		@Override
    +		public void run(SourceContext<Email> ctx) throws Exception {
    +			// Sleep for 10 seconds on start to allow time to copy jobid
    +			for (int i = 0; i < 100 && isRunning; i++) {
    +				Thread.sleep(100L);
    +			}
    +
    +			int types = LabelSurrogate.Type.values().length;
    +
    +			while (isRunning) {
    +				int r = random.nextInt(100);
    +
    +				final EmailId emailId = new EmailId(Integer.toString(random.nextInt()));
    +				final Instant timestamp = Instant.now().minus(Duration.ofDays(1L));
    +				final String foo = String.format("foo #%d", r);
    +				final LabelSurrogate label = new LabelSurrogate(LabelSurrogate.Type.values()[r % types], "bar");
    +
    --- End diff --
    
    Have to take the `checkpointLock` before emitting:
    
    ```
    synchronized (ctx.getCheckpointLock()) {
    	ctx.collect(new Email(emailId, timestamp, foo, label));
    }
    ```


---