You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/27 14:11:43 UTC
[2/3] flink git commit: [FLINK-8992] [e2e-tests] Integrate general
DataStream test job with project structure
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialValueStateBuilder.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialValueStateBuilder.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialValueStateBuilder.java
deleted file mode 100644
index c19924e..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ArtificialValueStateBuilder.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general.artificialstate.eventpayload;
-
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.streaming.tests.general.artificialstate.ArtificialKeyedStateBuilder;
-
-public class ArtificialValueStateBuilder<IN, STATE> extends ArtificialKeyedStateBuilder<IN> {
-
- private transient ValueState<STATE> valueState;
- private final TypeSerializer<STATE> typeSerializer;
- private final JoinFunction<IN, STATE, STATE> stateValueGenerator;
-
- public ArtificialValueStateBuilder(
- String stateName,
- JoinFunction<IN, STATE, STATE> stateValueGenerator,
- TypeSerializer<STATE> typeSerializer) {
-
- super(stateName);
- this.typeSerializer = typeSerializer;
- this.stateValueGenerator = stateValueGenerator;
- }
-
-
- @Override
- public void artificialStateForElement(IN event) throws Exception {
- valueState.update(stateValueGenerator.join(event, valueState.value()));
- }
-
- @Override
- public void initialize(FunctionInitializationContext initializationContext) {
- ValueStateDescriptor<STATE> valueStateDescriptor =
- new ValueStateDescriptor<>(stateName, typeSerializer);
- valueState = initializationContext.getKeyedStateStore().getState(valueStateDescriptor);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ComplexPayload.java
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ComplexPayload.java b/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ComplexPayload.java
deleted file mode 100644
index e3a372b..0000000
--- a/flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/general/artificialstate/eventpayload/ComplexPayload.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.tests.general.artificialstate.eventpayload;
-
-import org.apache.flink.streaming.tests.general.Event;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-public class ComplexPayload implements Serializable {
-
- public ComplexPayload(Event event) {
- this.eventTime = event.getEventTime();
- this.innerPayLoad = new InnerPayLoad(event.getSequenceNumber());
- this.stringList = Arrays.asList(String.valueOf(event.getKey()), event.getPayload());
- }
-
- private final long eventTime;
- private final List<String> stringList;
- private final InnerPayLoad innerPayLoad;
-
- public static class InnerPayLoad implements Serializable {
- private final long sequenceNumber;
-
- public InnerPayLoad(long sequenceNumber) {
- this.sequenceNumber = sequenceNumber;
- }
-
- public long getSequenceNumber() {
- return sequenceNumber;
- }
- }
-
- public long getEventTime() {
- return eventTime;
- }
-
- public List<String> getStringList() {
- return stringList;
- }
-
- public InnerPayLoad getInnerPayLoad() {
- return innerPayLoad;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ff62977d/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
index 621eac1..d3b3118 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
@@ -23,18 +23,14 @@ if [ -z $1 ] || [ -z $2 ]; then
fi
source "$(dirname "$0")"/common.sh
-source "$(dirname "$0")"/kafka-common.sh
-
-setup_kafka_dist
-start_kafka_cluster
ORIGINAL_DOP=$1
NEW_DOP=$2
if (( $ORIGINAL_DOP >= $NEW_DOP )); then
- NUM_SLOTS=$(( $ORIGINAL_DOP + 1 ))
+ NUM_SLOTS=$ORIGINAL_DOP
else
- NUM_SLOTS=$(( $NEW_DOP + 1 ))
+ NUM_SLOTS=$NEW_DOP
fi
# modify configuration to have enough slots
@@ -55,8 +51,6 @@ function test_cleanup {
# don't call ourselves again for normal exit
trap "" EXIT
- stop_kafka_cluster
-
# revert our modifications to the Flink distribution
mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
rm $FLINK_DIR/lib/flink-metrics-slf4j-*.jar
@@ -67,42 +61,39 @@ function test_cleanup {
trap test_cleanup INT
trap test_cleanup EXIT
-# create the required topic
-create_kafka_topic 1 1 test-input
-
-# run the state machine example job
-STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $FLINK_DIR/examples/streaming/StateMachineExample.jar \
- --kafka-topic test-input \
- | grep "Job has been submitted with JobID" | sed 's/.* //g')
-
-wait_job_running $STATE_MACHINE_JOB
+CHECKPOINT_DIR="file://$TEST_DATA_DIR/savepoint-e2e-test-chckpt-dir"
-# then, run the events generator
-EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \
- --kafka-topic test-input --sleep 15 \
+# run the DataStream allroundjob
+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-datastream-allround-test/target/DataStreamAllroundTestProgram.jar
+DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP $TEST_PROGRAM_JAR \
+ --test.semantics exactly-once \
+ --environment.parallelism $ORIGINAL_DOP \
+ --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+ --sequence_generator_source.sleep_time 15 \
+ --sequence_generator_source.sleep_after_elements 1 \
| grep "Job has been submitted with JobID" | sed 's/.* //g')
-wait_job_running $EVENTS_GEN_JOB
+wait_job_running $DATASTREAM_JOB
-function get_metric_state_machine_processed_records {
- grep ".State machine job.Flat Map -> Sink: Print to Std. Out.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1
+function get_metric_processed_records {
+ grep ".General purpose test job.ArtificalKeyedStateMapper.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1
}
function get_num_metric_samples {
- grep ".State machine job.Flat Map -> Sink: Print to Std. Out.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l
+ grep ".General purpose test job.ArtificalKeyedStateMapper.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l
}
# monitor the numRecordsIn metric of the state machine operator;
# only proceed to savepoint when the operator has processed 200 records
while : ; do
- NUM_RECORDS=$(get_metric_state_machine_processed_records)
+ NUM_RECORDS=$(get_metric_processed_records)
if [ -z $NUM_RECORDS ]; then
NUM_RECORDS=0
fi
if (( $NUM_RECORDS < 200 )); then
- echo "Waiting for state machine job to process up to 200 records, current progress: $NUM_RECORDS records ..."
+ echo "Waiting for job to process up to 200 records, current progress: $NUM_RECORDS records ..."
sleep 1
else
break
@@ -110,27 +101,31 @@ while : ; do
done
# take a savepoint of the state machine job
-SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \
+SAVEPOINT_PATH=$(take_savepoint $DATASTREAM_JOB $TEST_DATA_DIR \
| grep "Savepoint completed. Path:" | sed 's/.* //g')
-cancel_job $STATE_MACHINE_JOB
+cancel_job $DATASTREAM_JOB
# Since it is not possible to differentiate reporter output between the first and second execution,
# we remember the number of metrics sampled in the first execution so that they can be ignored in the following monitorings
OLD_NUM_METRICS=$(get_num_metric_samples)
# resume state machine job with savepoint
-STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \
- --kafka-topic test-input \
+DATASTREAM_JOB=$($FLINK_DIR/bin/flink run -s $SAVEPOINT_PATH -p $NEW_DOP -d $TEST_PROGRAM_JAR \
+ --test.semantics exactly-once \
+ --environment.parallelism $NEW_DOP \
+ --state_backend.checkpoint_directory $CHECKPOINT_DIR \
+ --sequence_generator_source.sleep_time 15 \
+ --sequence_generator_source.sleep_after_elements 1 \
| grep "Job has been submitted with JobID" | sed 's/.* //g')
-wait_job_running $STATE_MACHINE_JOB
+wait_job_running $DATASTREAM_JOB
# monitor the numRecordsIn metric of the state machine operator in the second execution
# we let the test finish once the second restore execution has processed 200 records
while : ; do
NUM_METRICS=$(get_num_metric_samples)
- NUM_RECORDS=$(get_metric_state_machine_processed_records)
+ NUM_RECORDS=$(get_metric_processed_records)
# only account for metrics that appeared in the second execution
if (( $OLD_NUM_METRICS >= $NUM_METRICS )) ; then
@@ -138,7 +133,7 @@ while : ; do
fi
if (( $NUM_RECORDS < 200 )); then
- echo "Waiting for state machine job to process up to 200 records, current progress: $NUM_RECORDS records ..."
+ echo "Waiting for job to process up to 200 records, current progress: $NUM_RECORDS records ..."
sleep 1
else
break