You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2016/09/09 11:03:28 UTC

[GitHub] flink pull request #2484: [FLINK-4177] Harden CassandraConnectorTest

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/2484

    [FLINK-4177] Harden CassandraConnectorTest

    This PR (hopefully) resolves the instability issues with the Cassandra connector tests.
    
    Changelog:
    * updated cassandra/driver versions
    * the `cassandra.yaml` was cleaned up
     * removed several configuration values that used the default
     * sorted the remaining settings in alphabetical order
    * the at-least-once sinks were modified to
     * properly log exceptions when close() is called
     * keep track of how many records were not acknowledged yet
    * the tests were modified to
     * start the embedded cassandra instance in a separate process
       * and supply an array of performance related jvm arguments, taken from the cassandra repo
     * no longer truncate tables; instead every test uses a separate table
     * wait until a connection could be established to cassandra in a retry-loop instead of waiting for a fixed time
     * no longer run actual flink jobs
     * use increased timeouts
     * clean up temporary files

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink cass_tmp

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2484.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2484
    
----
commit 64b125343839788530729a2119d1dc92e50e849a
Author: zentol <ch...@apache.org>
Date:   2016-09-05T09:03:00Z

    [FLINK-4177] Harden CassandraConnectorTest

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2484: [FLINK-4177] Harden CassandraConnectorTest

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2484


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2484: [FLINK-4177] Harden CassandraConnectorTest

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2484
  
    @StephanEwen I've rebased and updated the PR. The tests now use the `TestJvmProcess` class to setup the cassandra instance and have a fixed deadline for the initial connection instead of a fixed number of attempts.
    
    I have also removed the `sleep(5000)` statement; i would like to see whether we can get by without it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2484: [FLINK-4177] Harden CassandraConnectorTest

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/2484
  
    Probably improves test stability. My feeling is that as long as the issue Chesnay described in the discussion about the "sleep(5000)" remains, we will not have full test stability.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2484: [FLINK-4177] Harden CassandraConnectorTest

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the issue:

    https://github.com/apache/flink/pull/2484
  
    Will this pull request fix this build instability? https://s3.amazonaws.com/archive.travis-ci.org/jobs/166508416/log.txt


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2484: [FLINK-4177] Harden CassandraConnectorTest

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2484#discussion_r79462622
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---
    @@ -178,89 +166,122 @@ public static void startCassandra() throws IOException {
     		}
     		scanner.close();
     
    -
    -		// Tell cassandra where the configuration files are.
    -		// Use the test configuration file.
    -		System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
    -
     		if (EMBEDDED) {
    -			cassandra = new EmbeddedCassandraService();
    -			cassandra.start();
    +			String javaCommand = getJavaCommandPath();
    +
    +			// create a logging file for the process
    +			File tempLogFile = File.createTempFile("testlogconfig", "properties");
    +			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
    +
    +			// start the task manager process
    +			String[] command = new String[]{
    +				javaCommand,
    +				"-Dlog.level=DEBUG",
    +				"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
    +				"-Dcassandra.config=" + tmp.toURI(),
    +				// these options were taken directly from the jvm.options file in the cassandra repo
    +				"-XX:+UseThreadPriorities",
    +				"-Xss256k",
    +				"-XX:+AlwaysPreTouch",
    +				"-XX:+UseTLAB",
    +				"-XX:+ResizeTLAB",
    +				"-XX:+UseNUMA",
    +				"-XX:+PerfDisableSharedMem",
    +				"-XX:+UseParNewGC",
    +				"-XX:+UseConcMarkSweepGC",
    +				"-XX:+CMSParallelRemarkEnabled",
    +				"-XX:SurvivorRatio=8",
    +				"-XX:MaxTenuringThreshold=1",
    +				"-XX:CMSInitiatingOccupancyFraction=75",
    +				"-XX:+UseCMSInitiatingOccupancyOnly",
    +				"-XX:CMSWaitDuration=10000",
    +				"-XX:+CMSParallelInitialMarkEnabled",
    +				"-XX:+CMSEdenChunksRecordAlways",
    +				"-XX:+CMSClassUnloadingEnabled",
    +
    +				"-classpath", getCurrentClasspath(),
    +				EmbeddedCassandraService.class.getName()
    +			};
    +
    +			ProcessBuilder bld = new ProcessBuilder(command);
    +			cassandra = bld.start();
    +			sw = new StringWriter();
    +			new PipeForwarder(cassandra.getErrorStream(), sw);
     		}
     
    -		try {
    -			Thread.sleep(1000 * 10);
    -		} catch (InterruptedException e) { //give cassandra a few seconds to start up
    +		int attempt = 0;
    +		while (true) {
    +			try {
    +				attempt++;
    +				cluster = builder.getCluster();
    +				session = cluster.connect();
    +				break;
    +			} catch (Exception e) {
    +				if (attempt > 30) {
    +					throw e;
    +				}
    +				try {
    +					Thread.sleep(1000);
    --- End diff --
    
    It is not guaranteed that this actually waits as long as it is set, so a good strategy is actually to have a `while` loop with a deadline, rather than a fix number of attempts and a sleep.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2484: [FLINK-4177] Harden CassandraConnectorTest

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2484
  
    @rmetzger I did not encounter the issue you linked when i ran the tests, so i would be cautiously optimistic. As Stephan said though i can't guarantee complete stability.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2484: [FLINK-4177] Harden CassandraConnectorTest

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2484#discussion_r80232630
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---
    @@ -178,89 +166,122 @@ public static void startCassandra() throws IOException {
     		}
     		scanner.close();
     
    -
    -		// Tell cassandra where the configuration files are.
    -		// Use the test configuration file.
    -		System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
    -
     		if (EMBEDDED) {
    -			cassandra = new EmbeddedCassandraService();
    -			cassandra.start();
    +			String javaCommand = getJavaCommandPath();
    --- End diff --
    
    wasn't aware of it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2484: [FLINK-4177] Harden CassandraConnectorTest

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2484#discussion_r80238252
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---
    @@ -178,89 +166,122 @@ public static void startCassandra() throws IOException {
     		}
     		scanner.close();
     
    -
    -		// Tell cassandra where the configuration files are.
    -		// Use the test configuration file.
    -		System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
    -
     		if (EMBEDDED) {
    -			cassandra = new EmbeddedCassandraService();
    -			cassandra.start();
    +			String javaCommand = getJavaCommandPath();
    +
    +			// create a logging file for the process
    +			File tempLogFile = File.createTempFile("testlogconfig", "properties");
    +			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
    +
    +			// start the task manager process
    +			String[] command = new String[]{
    +				javaCommand,
    +				"-Dlog.level=DEBUG",
    +				"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
    +				"-Dcassandra.config=" + tmp.toURI(),
    +				// these options were taken directly from the jvm.options file in the cassandra repo
    +				"-XX:+UseThreadPriorities",
    +				"-Xss256k",
    +				"-XX:+AlwaysPreTouch",
    +				"-XX:+UseTLAB",
    +				"-XX:+ResizeTLAB",
    +				"-XX:+UseNUMA",
    +				"-XX:+PerfDisableSharedMem",
    +				"-XX:+UseParNewGC",
    +				"-XX:+UseConcMarkSweepGC",
    +				"-XX:+CMSParallelRemarkEnabled",
    +				"-XX:SurvivorRatio=8",
    +				"-XX:MaxTenuringThreshold=1",
    +				"-XX:CMSInitiatingOccupancyFraction=75",
    +				"-XX:+UseCMSInitiatingOccupancyOnly",
    +				"-XX:CMSWaitDuration=10000",
    +				"-XX:+CMSParallelInitialMarkEnabled",
    +				"-XX:+CMSEdenChunksRecordAlways",
    +				"-XX:+CMSClassUnloadingEnabled",
    +
    +				"-classpath", getCurrentClasspath(),
    +				EmbeddedCassandraService.class.getName()
    +			};
    +
    +			ProcessBuilder bld = new ProcessBuilder(command);
    +			cassandra = bld.start();
    +			sw = new StringWriter();
    +			new PipeForwarder(cassandra.getErrorStream(), sw);
     		}
     
    -		try {
    -			Thread.sleep(1000 * 10);
    -		} catch (InterruptedException e) { //give cassandra a few seconds to start up
    +		int attempt = 0;
    +		while (true) {
    +			try {
    +				attempt++;
    +				cluster = builder.getCluster();
    +				session = cluster.connect();
    +				break;
    +			} catch (Exception e) {
    +				if (attempt > 30) {
    +					throw e;
    +				}
    +				try {
    +					Thread.sleep(1000);
    +				} catch (InterruptedException e1) {
    +				}
    +			}
     		}
    -
    -		cluster = builder.getCluster();
    -		session = cluster.connect();
    +		LOG.debug("Connection established after " + attempt + " attempts.");
     
     		session.execute(CREATE_KEYSPACE_QUERY);
    -		session.execute(CREATE_TABLE_QUERY);
    -	}
    -
    -	@BeforeClass
    -	public static void startFlink() throws Exception {
    -		Configuration config = new Configuration();
    -		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
    +		session.execute(CREATE_TABLE_QUERY.replace("$TABLE", "flink_initial"));
     
    -		flinkCluster = new ForkableFlinkMiniCluster(config);
    -		flinkCluster.start();
    +		try {
    +			Thread.sleep(5000);
    --- End diff --
    
    I tried that and the results were actually quite interesting.
    
    The last issues i dealt with was that fairly regularly the first test that is executed would fail. I could disable one test after another, but the failure would just occur on the test that was now executed first. I figured that Cassandra may not be ready yet to receive data, so i implemented what you suggested: create a table, write a row, and check whether it was written. That thing failed not even once, but sure enough the tests still did.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2484: [FLINK-4177] Harden CassandraConnectorTest

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2484#discussion_r79462214
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---
    @@ -178,89 +166,122 @@ public static void startCassandra() throws IOException {
     		}
     		scanner.close();
     
    -
    -		// Tell cassandra where the configuration files are.
    -		// Use the test configuration file.
    -		System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
    -
     		if (EMBEDDED) {
    -			cassandra = new EmbeddedCassandraService();
    -			cassandra.start();
    +			String javaCommand = getJavaCommandPath();
    --- End diff --
    
    There is the `TestJvmProcess` class that encapsulates much behavior, including log forwarding. Did that not work here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2484: [FLINK-4177] Harden CassandraConnectorTest

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2484#discussion_r79463411
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---
    @@ -178,89 +166,122 @@ public static void startCassandra() throws IOException {
     		}
     		scanner.close();
     
    -
    -		// Tell cassandra where the configuration files are.
    -		// Use the test configuration file.
    -		System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
    -
     		if (EMBEDDED) {
    -			cassandra = new EmbeddedCassandraService();
    -			cassandra.start();
    +			String javaCommand = getJavaCommandPath();
    +
    +			// create a logging file for the process
    +			File tempLogFile = File.createTempFile("testlogconfig", "properties");
    +			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
    +
    +			// start the task manager process
    +			String[] command = new String[]{
    +				javaCommand,
    +				"-Dlog.level=DEBUG",
    +				"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
    +				"-Dcassandra.config=" + tmp.toURI(),
    +				// these options were taken directly from the jvm.options file in the cassandra repo
    +				"-XX:+UseThreadPriorities",
    +				"-Xss256k",
    +				"-XX:+AlwaysPreTouch",
    +				"-XX:+UseTLAB",
    +				"-XX:+ResizeTLAB",
    +				"-XX:+UseNUMA",
    +				"-XX:+PerfDisableSharedMem",
    +				"-XX:+UseParNewGC",
    +				"-XX:+UseConcMarkSweepGC",
    +				"-XX:+CMSParallelRemarkEnabled",
    +				"-XX:SurvivorRatio=8",
    +				"-XX:MaxTenuringThreshold=1",
    +				"-XX:CMSInitiatingOccupancyFraction=75",
    +				"-XX:+UseCMSInitiatingOccupancyOnly",
    +				"-XX:CMSWaitDuration=10000",
    +				"-XX:+CMSParallelInitialMarkEnabled",
    +				"-XX:+CMSEdenChunksRecordAlways",
    +				"-XX:+CMSClassUnloadingEnabled",
    +
    +				"-classpath", getCurrentClasspath(),
    +				EmbeddedCassandraService.class.getName()
    +			};
    +
    +			ProcessBuilder bld = new ProcessBuilder(command);
    +			cassandra = bld.start();
    +			sw = new StringWriter();
    +			new PipeForwarder(cassandra.getErrorStream(), sw);
     		}
     
    -		try {
    -			Thread.sleep(1000 * 10);
    -		} catch (InterruptedException e) { //give cassandra a few seconds to start up
    +		int attempt = 0;
    +		while (true) {
    +			try {
    +				attempt++;
    +				cluster = builder.getCluster();
    +				session = cluster.connect();
    +				break;
    +			} catch (Exception e) {
    +				if (attempt > 30) {
    +					throw e;
    +				}
    +				try {
    +					Thread.sleep(1000);
    +				} catch (InterruptedException e1) {
    +				}
    +			}
     		}
    -
    -		cluster = builder.getCluster();
    -		session = cluster.connect();
    +		LOG.debug("Connection established after " + attempt + " attempts.");
     
     		session.execute(CREATE_KEYSPACE_QUERY);
    -		session.execute(CREATE_TABLE_QUERY);
    -	}
    -
    -	@BeforeClass
    -	public static void startFlink() throws Exception {
    -		Configuration config = new Configuration();
    -		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
    +		session.execute(CREATE_TABLE_QUERY.replace("$TABLE", "flink_initial"));
     
    -		flinkCluster = new ForkableFlinkMiniCluster(config);
    -		flinkCluster.start();
    +		try {
    +			Thread.sleep(5000);
    --- End diff --
    
    Fix time sleeps usually result in a mix of long build times (value is large) and instability (value is sometimes not large enough). Can one do a polling scheme here, that checks if the statement is completed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2484: [FLINK-4177] Harden CassandraConnectorTest

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2484#discussion_r80233989
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---
    @@ -178,89 +166,122 @@ public static void startCassandra() throws IOException {
     		}
     		scanner.close();
     
    -
    -		// Tell cassandra where the configuration files are.
    -		// Use the test configuration file.
    -		System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
    -
     		if (EMBEDDED) {
    -			cassandra = new EmbeddedCassandraService();
    -			cassandra.start();
    +			String javaCommand = getJavaCommandPath();
    +
    +			// create a logging file for the process
    +			File tempLogFile = File.createTempFile("testlogconfig", "properties");
    +			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
    +
    +			// start the task manager process
    +			String[] command = new String[]{
    +				javaCommand,
    +				"-Dlog.level=DEBUG",
    +				"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
    +				"-Dcassandra.config=" + tmp.toURI(),
    +				// these options were taken directly from the jvm.options file in the cassandra repo
    +				"-XX:+UseThreadPriorities",
    +				"-Xss256k",
    +				"-XX:+AlwaysPreTouch",
    +				"-XX:+UseTLAB",
    +				"-XX:+ResizeTLAB",
    +				"-XX:+UseNUMA",
    +				"-XX:+PerfDisableSharedMem",
    +				"-XX:+UseParNewGC",
    +				"-XX:+UseConcMarkSweepGC",
    +				"-XX:+CMSParallelRemarkEnabled",
    +				"-XX:SurvivorRatio=8",
    +				"-XX:MaxTenuringThreshold=1",
    +				"-XX:CMSInitiatingOccupancyFraction=75",
    +				"-XX:+UseCMSInitiatingOccupancyOnly",
    +				"-XX:CMSWaitDuration=10000",
    +				"-XX:+CMSParallelInitialMarkEnabled",
    +				"-XX:+CMSEdenChunksRecordAlways",
    +				"-XX:+CMSClassUnloadingEnabled",
    +
    +				"-classpath", getCurrentClasspath(),
    +				EmbeddedCassandraService.class.getName()
    +			};
    +
    +			ProcessBuilder bld = new ProcessBuilder(command);
    +			cassandra = bld.start();
    +			sw = new StringWriter();
    +			new PipeForwarder(cassandra.getErrorStream(), sw);
     		}
     
    -		try {
    -			Thread.sleep(1000 * 10);
    -		} catch (InterruptedException e) { //give cassandra a few seconds to start up
    +		int attempt = 0;
    +		while (true) {
    +			try {
    +				attempt++;
    +				cluster = builder.getCluster();
    +				session = cluster.connect();
    +				break;
    +			} catch (Exception e) {
    +				if (attempt > 30) {
    +					throw e;
    +				}
    +				try {
    +					Thread.sleep(1000);
    --- End diff --
    
    that's true, will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2484: [FLINK-4177] Harden CassandraConnectorTest

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2484#discussion_r80235122
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---
    @@ -178,89 +166,122 @@ public static void startCassandra() throws IOException {
     		}
     		scanner.close();
     
    -
    -		// Tell cassandra where the configuration files are.
    -		// Use the test configuration file.
    -		System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
    -
     		if (EMBEDDED) {
    -			cassandra = new EmbeddedCassandraService();
    -			cassandra.start();
    +			String javaCommand = getJavaCommandPath();
    +
    +			// create a logging file for the process
    +			File tempLogFile = File.createTempFile("testlogconfig", "properties");
    +			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
    +
    +			// start the task manager process
    +			String[] command = new String[]{
    +				javaCommand,
    +				"-Dlog.level=DEBUG",
    +				"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
    +				"-Dcassandra.config=" + tmp.toURI(),
    +				// these options were taken directly from the jvm.options file in the cassandra repo
    +				"-XX:+UseThreadPriorities",
    +				"-Xss256k",
    +				"-XX:+AlwaysPreTouch",
    +				"-XX:+UseTLAB",
    +				"-XX:+ResizeTLAB",
    +				"-XX:+UseNUMA",
    +				"-XX:+PerfDisableSharedMem",
    +				"-XX:+UseParNewGC",
    +				"-XX:+UseConcMarkSweepGC",
    +				"-XX:+CMSParallelRemarkEnabled",
    +				"-XX:SurvivorRatio=8",
    +				"-XX:MaxTenuringThreshold=1",
    +				"-XX:CMSInitiatingOccupancyFraction=75",
    +				"-XX:+UseCMSInitiatingOccupancyOnly",
    +				"-XX:CMSWaitDuration=10000",
    +				"-XX:+CMSParallelInitialMarkEnabled",
    +				"-XX:+CMSEdenChunksRecordAlways",
    +				"-XX:+CMSClassUnloadingEnabled",
    +
    +				"-classpath", getCurrentClasspath(),
    +				EmbeddedCassandraService.class.getName()
    +			};
    +
    +			ProcessBuilder bld = new ProcessBuilder(command);
    +			cassandra = bld.start();
    +			sw = new StringWriter();
    +			new PipeForwarder(cassandra.getErrorStream(), sw);
     		}
     
    -		try {
    -			Thread.sleep(1000 * 10);
    -		} catch (InterruptedException e) { //give cassandra a few seconds to start up
    +		int attempt = 0;
    +		while (true) {
    +			try {
    +				attempt++;
    +				cluster = builder.getCluster();
    +				session = cluster.connect();
    +				break;
    +			} catch (Exception e) {
    +				if (attempt > 30) {
    +					throw e;
    +				}
    +				try {
    +					Thread.sleep(1000);
    +				} catch (InterruptedException e1) {
    +				}
    +			}
     		}
    -
    -		cluster = builder.getCluster();
    -		session = cluster.connect();
    +		LOG.debug("Connection established after " + attempt + " attempts.");
     
     		session.execute(CREATE_KEYSPACE_QUERY);
    -		session.execute(CREATE_TABLE_QUERY);
    -	}
    -
    -	@BeforeClass
    -	public static void startFlink() throws Exception {
    -		Configuration config = new Configuration();
    -		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
    +		session.execute(CREATE_TABLE_QUERY.replace("$TABLE", "flink_initial"));
     
    -		flinkCluster = new ForkableFlinkMiniCluster(config);
    -		flinkCluster.start();
    +		try {
    +			Thread.sleep(5000);
    --- End diff --
    
    Can this not be modeled via attempting trying to execute operations on the table? Or is the remaining work not guaranteed to succeed, even if the first table operations succeed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2484: [FLINK-4177] Harden CassandraConnectorTest

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2484#discussion_r80233334
  
    --- Diff: flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---
    @@ -178,89 +166,122 @@ public static void startCassandra() throws IOException {
     		}
     		scanner.close();
     
    -
    -		// Tell cassandra where the configuration files are.
    -		// Use the test configuration file.
    -		System.setProperty("cassandra.config", tmp.getAbsoluteFile().toURI().toString());
    -
     		if (EMBEDDED) {
    -			cassandra = new EmbeddedCassandraService();
    -			cassandra.start();
    +			String javaCommand = getJavaCommandPath();
    +
    +			// create a logging file for the process
    +			File tempLogFile = File.createTempFile("testlogconfig", "properties");
    +			CommonTestUtils.printLog4jDebugConfig(tempLogFile);
    +
    +			// start the task manager process
    +			String[] command = new String[]{
    +				javaCommand,
    +				"-Dlog.level=DEBUG",
    +				"-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(),
    +				"-Dcassandra.config=" + tmp.toURI(),
    +				// these options were taken directly from the jvm.options file in the cassandra repo
    +				"-XX:+UseThreadPriorities",
    +				"-Xss256k",
    +				"-XX:+AlwaysPreTouch",
    +				"-XX:+UseTLAB",
    +				"-XX:+ResizeTLAB",
    +				"-XX:+UseNUMA",
    +				"-XX:+PerfDisableSharedMem",
    +				"-XX:+UseParNewGC",
    +				"-XX:+UseConcMarkSweepGC",
    +				"-XX:+CMSParallelRemarkEnabled",
    +				"-XX:SurvivorRatio=8",
    +				"-XX:MaxTenuringThreshold=1",
    +				"-XX:CMSInitiatingOccupancyFraction=75",
    +				"-XX:+UseCMSInitiatingOccupancyOnly",
    +				"-XX:CMSWaitDuration=10000",
    +				"-XX:+CMSParallelInitialMarkEnabled",
    +				"-XX:+CMSEdenChunksRecordAlways",
    +				"-XX:+CMSClassUnloadingEnabled",
    +
    +				"-classpath", getCurrentClasspath(),
    +				EmbeddedCassandraService.class.getName()
    +			};
    +
    +			ProcessBuilder bld = new ProcessBuilder(command);
    +			cassandra = bld.start();
    +			sw = new StringWriter();
    +			new PipeForwarder(cassandra.getErrorStream(), sw);
     		}
     
    -		try {
    -			Thread.sleep(1000 * 10);
    -		} catch (InterruptedException e) { //give cassandra a few seconds to start up
    +		int attempt = 0;
    +		while (true) {
    +			try {
    +				attempt++;
    +				cluster = builder.getCluster();
    +				session = cluster.connect();
    +				break;
    +			} catch (Exception e) {
    +				if (attempt > 30) {
    +					throw e;
    +				}
    +				try {
    +					Thread.sleep(1000);
    +				} catch (InterruptedException e1) {
    +				}
    +			}
     		}
    -
    -		cluster = builder.getCluster();
    -		session = cluster.connect();
    +		LOG.debug("Connection established after " + attempt + " attempts.");
     
     		session.execute(CREATE_KEYSPACE_QUERY);
    -		session.execute(CREATE_TABLE_QUERY);
    -	}
    -
    -	@BeforeClass
    -	public static void startFlink() throws Exception {
    -		Configuration config = new Configuration();
    -		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
    +		session.execute(CREATE_TABLE_QUERY.replace("$TABLE", "flink_initial"));
     
    -		flinkCluster = new ForkableFlinkMiniCluster(config);
    -		flinkCluster.start();
    +		try {
    +			Thread.sleep(5000);
    --- End diff --
    
    the execute() call is synchronous, it's not about making sure that it is complete. I just found that giving Cassandra a bit of time after creating the keyspace and the first table resulted in a higher stability.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2484: [FLINK-4177] Harden CassandraConnectorTest

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/2484
  
    merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---