You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/27 14:11:43 UTC

[2/3] flink git commit: [FLINK-8992] [e2e-tests] Integrate general DataStream test job with project structure

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialValueStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialValueStateBuilder.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialValueStateBuilder.java
deleted file mode 100644
index c19924e..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialValueStateBuilder.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general.artificialstate.eventpayload;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.streaming.tests.general.artificialstate.ArtificialKeyedStateBuilder;
-
-public class ArtificialValueStateBuilder<IN, STATE> extends ArtificialKeyedStateBuilder<IN> {
-
-	private transient ValueState<STATE> valueState;
-	private final TypeSerializer<STATE> typeSerializer;
-	private final JoinFunction<IN, STATE, STATE> stateValueGenerator;
-
-	public ArtificialValueStateBuilder(
-		String stateName,
-		JoinFunction<IN, STATE, STATE> stateValueGenerator,
-		TypeSerializer<STATE> typeSerializer) {
-
-		super(stateName);
-		this.typeSerializer = typeSerializer;
-		this.stateValueGenerator = stateValueGenerator;
-	}
-
-
-	@Override
-	public void artificialStateForElement(IN event) throws Exception {
-		valueState.update(stateValueGenerator.join(event, valueState.value()));
-	}
-
-	@Override
-	public void initialize(FunctionInitializationContext initializationContext) {
-		ValueStateDescriptor<STATE> valueStateDescriptor =
-			new ValueStateDescriptor<>(stateName, typeSerializer);
-		valueState = initializationContext.getKeyedStateStore().getState(valueStateDescriptor);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ComplexPayload.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ComplexPayload.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ComplexPayload.java
deleted file mode 100644
index e3a372b..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ComplexPayload.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general.artificialstate.eventpayload;
-
-import org.apache.flink.streaming.tests.general.Event;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-public class ComplexPayload implements Serializable {
-
-	public ComplexPayload(Event event) {
-		this.eventTime = event.getEventTime();
-		this.innerPayLoad = new InnerPayLoad(event.getSequenceNumber());
-		this.stringList = Arrays.asList(String.valueOf(event.getKey()), event.getPayload());
-	}
-
-	private final long eventTime;
-	private final List<String> stringList;
-	private final InnerPayLoad innerPayLoad;
-
-	public static class InnerPayLoad implements Serializable {
-		private final long sequenceNumber;
-
-		public InnerPayLoad(long sequenceNumber) {
-			this.sequenceNumber = sequenceNumber;
-		}
-
-		public long getSequenceNumber() {
-			return sequenceNumber;
-		}
-	}
-
-	public long getEventTime() {
-		return eventTime;
-	}
-
-	public List<String> getStringList() {
-		return stringList;
-	}
-
-	public InnerPayLoad getInnerPayLoad() {
-		return innerPayLoad;
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
index 621eac1..d3b3118 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
@@ -23,18 +23,14 @@ if [ -z $1 ] || [ -z $2 ]; then
 fi
 
 source "$(dirname "$0")"/common.sh
-source "$(dirname "$0")"/kafka-common.sh
-
-setup_kafka_dist
-start_kafka_cluster
 
 ORIGINAL_DOP=$1
 NEW_DOP=$2
 
 if (( $ORIGINAL_DOP >= $NEW_DOP )); then
-  NUM_SLOTS=$(( $ORIGINAL_DOP + 1 ))
+  NUM_SLOTS=$ORIGINAL_DOP
 else
-  NUM_SLOTS=$(( $NEW_DOP + 1 ))
+  NUM_SLOTS=$NEW_DOP
 fi
 
 # modify configuration to have enough slots
@@ -55,8 +51,6 @@ function test_cleanup {
   # don't call ourselves again for normal exit
   trap "" EXIT
 
-  stop_kafka_cluster
-
   # revert our modifications to the Flink distribution
   mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
   rm $FLINK_DIR/lib/flink-metrics-slf4j-*.jar
@@ -67,42 +61,39 @@ function test_cleanup {
 trap test_cleanup INT
 trap test_cleanup EXIT
 
-# create the required topic
-create_kafka_topic 1 1 test-input
-
-# run the state machine example job
-STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $FLINK_DIR/examples/streaming/StateMachineExample.jar \
-  --kafka-topic test-input \
-  | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $STATE_MACHINE_JOB
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
 
-# then, run the events generator
-EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \
-  --kafka-topic test-input --sleep 15 \
+# run the DataStream allroundjob
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
+  --test.semantics exactly-once \
+  --environment.parallelism $ORIGINAL_DOP \
+  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+  --sequence_generator_source.sleep_time 15 \
+  --sequence_generator_source.sleep_after_elements 1 \
   | grep "Job has been submitted with JobID" | sed 's/.* //g')
 
-wait_job_running $EVENTS_GEN_JOB
+wait_job_running $DATASTREAM_JOB
 
-function get_metric_state_machine_processed_records {
-  grep ".State machine job.Flat Map -> Sink: Print to Std. Out.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1
+function get_metric_processed_records {
+  grep ".General purpose test job.ArtificalKeyedStateMapper.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1
 }
 
 function get_num_metric_samples {
-  grep ".State machine job.Flat Map -> Sink: Print to Std. Out.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l
+  grep ".General purpose test job.ArtificalKeyedStateMapper.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l
 }
 
 # monitor the numRecordsIn metric of the state machine operator;
 # only proceed to savepoint when the operator has processed 200 records
 while : ; do
-  NUM_RECORDS=$(get_metric_state_machine_processed_records)
+  NUM_RECORDS=$(get_metric_processed_records)
 
   if [ -z $NUM_RECORDS ]; then
     NUM_RECORDS=0
   fi
 
   if (( $NUM_RECORDS < 200 )); then
-    echo "Waiting for state machine job to process up to 200 records, current progress: $NUM_RECORDS records ..."
+    echo "Waiting for job to process up to 200 records, current progress: $NUM_RECORDS records ..."
     sleep 1
   else
     break
@@ -110,27 +101,31 @@ while : ; do
 done
 
 # take a savepoint of the state machine job
-SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
+SAVEPOINT_PATH=$(take_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
   | grep "Savepoint completed. Path:" | sed 's/.* //g')
 
-cancel_job $STATE_MACHINE_JOB
+cancel_job $DATASTREAM_JOB
 
 # Since it is not possible to differentiate reporter output between the first and second execution,
 # we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
 OLD_NUM_METRICS=$(get_num_metric_samples)
 
 # resume state machine job with savepoint
-STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \
-  --kafka-topic test-input \
+DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
+  --test.semantics exactly-once \
+  --environment.parallelism $NEW_DOP \
+  --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+  --sequence_generator_source.sleep_time 15 \
+  --sequence_generator_source.sleep_after_elements 1 \
   | grep "Job has been submitted with JobID" | sed 's/.* //g')
 
-wait_job_running $STATE_MACHINE_JOB
+wait_job_running $DATASTREAM_JOB
 
 # monitor the numRecordsIn metric of the state machine operator in the second execution
 # we let the test finish once the second restore execution has processed 200 records
 while : ; do
   NUM_METRICS=$(get_num_metric_samples)
-  NUM_RECORDS=$(get_metric_state_machine_processed_records)
+  NUM_RECORDS=$(get_metric_processed_records)
 
   # only account for metrics that appeared in the second execution
   if (( $OLD_NUM_METRICS >= $NUM_METRICS )) ; then
@@ -138,7 +133,7 @@ while : ; do
   fi
 
   if (( $NUM_RECORDS < 200 )); then
-    echo "Waiting for state machine job to process up to 200 records, current progress: $NUM_RECORDS records ..."
+    echo "Waiting for job to process up to 200 records, current progress: $NUM_RECORDS records ..."
     sleep 1
   else
     break