You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/11/17 22:50:12 UTC
[1/2] incubator-flink git commit: [tests] Various stability fixes to
tests
Repository: incubator-flink
Updated Branches:
refs/heads/master f76eb15b6 -> ae07abe37
[tests] Various stability fixes to tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/06c259f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/06c259f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/06c259f6
Branch: refs/heads/master
Commit: 06c259f641ffcdb4e524370f85c4c9b12dcdfc28
Parents: f76eb15
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 17 21:55:51 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 17 21:55:51 2014 +0100
----------------------------------------------------------------------
.../librarycache/BlobLibraryCacheManager.java | 6 ++++++
.../flink/runtime/jobmanager/JobManager.java | 1 +
.../librarycache/BlobLibraryCacheManagerTest.java | 18 +++++++++++++++---
.../scheduler/SchedulerIsolatedTasksTest.java | 12 +++++++++++-
4 files changed, 33 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/06c259f6/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
index 70b60fe..8a0f1f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java
@@ -209,6 +209,12 @@ public final class BlobLibraryCacheManager extends TimerTask implements LibraryC
}
}
+ int getNumberOfCachedLibraries() {
+ synchronized (lockObject) {
+ return blobKeyReferenceCounters.size();
+ }
+ }
+
private URL registerReferenceToBlobKeyAndGetURL(BlobKey key) throws IOException {
Integer references = blobKeyReferenceCounters.get(key);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/06c259f6/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 7fb4f94..95287d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -263,6 +263,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
} catch (InterruptedException e) {
LOG.debug("Shutdown of executor thread pool interrupted", e);
}
+ this.executorService.shutdownNow();
}
// Stop instance manager
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/06c259f6/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
index 67dd8a8..0cf3e02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManagerTest.java
@@ -46,7 +46,7 @@ public class BlobLibraryCacheManagerTest {
JobID jid = new JobID();
List<BlobKey> keys = new ArrayList<BlobKey>();
BlobServer server = null;
- LibraryCacheManager libraryCacheManager = null;
+ BlobLibraryCacheManager libraryCacheManager = null;
final byte[] buf = new byte[128];
@@ -73,7 +73,19 @@ public class BlobLibraryCacheManagerTest {
libraryCacheManager.unregisterJob(jid);
- Thread.sleep(1500);
+ // because we cannot guarantee that there are not thread races in the build system, we
+ // loop for a certain while until the references disappear
+ {
+ long deadline = System.currentTimeMillis() + 30000;
+ do {
+ Thread.sleep(500);
+ }
+ while (libraryCacheManager.getNumberOfCachedLibraries() > 0 &&
+ System.currentTimeMillis() < deadline);
+ }
+
+ // this fails if we exited via a timeout
+ assertEquals(0, libraryCacheManager.getNumberOfCachedLibraries());
int caughtExceptions = 0;
@@ -90,7 +102,7 @@ public class BlobLibraryCacheManagerTest {
bc.close();
}
- catch(Exception e){
+ catch (Exception e){
e.printStackTrace();
fail(e.getMessage());
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/06c259f6/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
index 34e9f68..ad040f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerIsolatedTasksTest.java
@@ -33,7 +33,9 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.runtime.instance.AllocatedSlot;
@@ -181,10 +183,12 @@ public class SchedulerIsolatedTasksTest {
final int NUM_SLOTS_PER_INSTANCE = 3;
final int NUM_TASKS_TO_SCHEDULE = 2000;
+ final ExecutorService executor = Executors.newFixedThreadPool(4, ExecutorThreadFactory.INSTANCE);
+
try {
// note: since this test asynchronously releases slots, the executor needs release workers.
// doing the release call synchronous can lead to a deadlock
- Scheduler scheduler = new Scheduler(Executors.newFixedThreadPool(4, ExecutorThreadFactory.INSTANCE));
+ Scheduler scheduler = new Scheduler(executor);
for (int i = 0;i < NUM_INSTANCES; i++) {
scheduler.newInstanceAvailable(getRandomInstance((int) (Math.random() * NUM_SLOTS_PER_INSTANCE) + 1));
@@ -262,12 +266,18 @@ public class SchedulerIsolatedTasksTest {
// the slots should all be different
assertTrue(areAllDistinct(slotsAfter.toArray()));
+ executor.shutdown();
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+
assertEquals(totalSlots, scheduler.getNumberOfAvailableSlots());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
+ finally {
+ executor.shutdownNow();
+ }
}
@Test
[2/2] incubator-flink git commit: [build] Manage version of joda-time
to prevent conflicts between dependencies.
Posted by se...@apache.org.
[build] Manage version of joda-time to prevent conflicts between dependencies.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/ae07abe3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/ae07abe3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/ae07abe3
Branch: refs/heads/master
Commit: ae07abe3781eb583774eab917953f31cf4925cd9
Parents: 06c259f
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Nov 17 22:07:09 2014 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Nov 17 22:07:09 2014 +0100
----------------------------------------------------------------------
pom.xml | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/ae07abe3/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2547279..35b70f8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -175,7 +175,7 @@ under the License.
<artifactId>jetty-util</artifactId>
<version>8.0.0.M1</version>
</dependency>
-
+
<!-- Make sure we use a consistent avro version throughout the project -->
<dependency>
<groupId>org.apache.avro</groupId>
@@ -208,7 +208,14 @@ under the License.
<artifactId>javassist</artifactId>
<version>3.18.1-GA</version>
</dependency>
-
+
+ <!-- joda time is pulled in different versions by different transitive dependencies-->
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.5</version>
+ </dependency>
+
<!-- stax is pulled in different versions by different transitive dependencies-->
<dependency>
<groupId>stax</groupId>