You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ma...@apache.org on 2016/02/25 23:23:02 UTC
reef git commit: [REEF-1006] Remove TimeoutSubject and tests from Wake
Repository: reef
Updated Branches:
refs/heads/master 72fd9de12 -> 3056022be
[REEF-1006] Remove TimeoutSubject and tests from Wake
`TimeoutSubject` is deprecated in REEF-535 of Release 0.14.
This change removes TimeoutSubject.java and TimeoutSubjectTest.java in reef-wake.
JIRA:
[REEF-1006](https://issues.apache.org/jira/browse/REEF-1006)
Pull Request:
This closes #861
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/3056022b
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/3056022b
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/3056022b
Branch: refs/heads/master
Commit: 3056022be5325cffcd158bc0934bcc72abb2b74b
Parents: 72fd9de
Author: Dongjoon Hyun <do...@apache.org>
Authored: Thu Feb 25 13:19:30 2016 -0800
Committer: Mariia Mykhailova <ma...@apache.org>
Committed: Thu Feb 25 14:21:36 2016 -0800
----------------------------------------------------------------------
.../reef/wake/rx/impl/TimeoutSubject.java | 93 ------------
.../reef/wake/test/rx/TimeoutSubjectTest.java | 146 -------------------
2 files changed, 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/3056022b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java
deleted file mode 100644
index 89bba0f..0000000
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.wake.rx.impl;
-
-import org.apache.reef.wake.rx.Observer;
-import org.apache.reef.wake.rx.Subject;
-
-import java.util.concurrent.TimeoutException;
-
-/**
- * A class implementing {@code Subject<T>} with timeout.
- *
- * @param <T> a type of subject
- * @deprecated in 0.14 as unused
- */
-@Deprecated
-public class TimeoutSubject<T> implements Subject<T, T> {
- private Thread timeBomb;
- private Observer<T> destination;
- private boolean finished;
-
- public TimeoutSubject(final long timeout, final Observer<T> handler) {
- this.finished = false;
- this.destination = handler;
- final TimeoutSubject<T> outer = this;
- this.timeBomb = new Thread(new Runnable() {
- @Override
- public void run() {
- final boolean finishedCopy;
- synchronized (outer) {
- if (!finished) {
- try {
- outer.wait(timeout);
- } catch (final InterruptedException e) {
- return;
- }
- }
- finishedCopy = finished;
- finished = true; // lock out the caller from putting event through now
- }
- if (!finishedCopy) {
- destination.onError(new TimeoutException("TimeoutSubject expired"));
- }
- }
- });
- this.timeBomb.start();
- }
-
- @Override
- public void onNext(final T value) {
- final boolean wasFinished;
- synchronized (this) {
- wasFinished = finished;
- if (!finished) {
- this.notify();
- finished = true;
- }
- }
- if (!wasFinished) {
- // TODO[JIRA unneeded due to deprecation]: change Subject to specify conversion to T
- destination.onNext(value);
- destination.onCompleted();
- }
- }
-
- @Override
- public void onError(final Exception error) {
- this.timeBomb.interrupt();
- destination.onError(error);
- }
-
- @Override
- public void onCompleted() {
- throw new IllegalStateException("Should not be called directly");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/3056022b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/rx/TimeoutSubjectTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/rx/TimeoutSubjectTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/rx/TimeoutSubjectTest.java
deleted file mode 100644
index c433b81..0000000
--- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/rx/TimeoutSubjectTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.wake.test.rx;
-
-import org.apache.reef.wake.rx.Observer;
-import org.apache.reef.wake.rx.Subject;
-import org.apache.reef.wake.rx.impl.TimeoutSubject;
-import org.junit.Test;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests for TimeoutSubject.
- */
-public class TimeoutSubjectTest {
-
- @Test
- public void testSuccess() {
- final AtomicInteger nexts = new AtomicInteger(0);
- final AtomicInteger completes = new AtomicInteger(0);
- final int delta = 400;
- final Subject<Integer, Integer> dut = new TimeoutSubject<>(10000, new Observer<Integer>() {
-
- @Override
- public void onNext(final Integer value) {
- nexts.addAndGet(delta);
- }
-
- @Override
- public void onError(final Exception error) {
- fail(error.toString());
- }
-
- @Override
- public void onCompleted() {
- assertEquals(delta, nexts.get());
- completes.incrementAndGet();
- }
- });
- dut.onNext(delta);
-
- assertEquals(delta, nexts.get());
- assertEquals(1, completes.get());
- }
-
- @Test
- public void testDifferentThread() {
- final AtomicInteger nexts = new AtomicInteger(0);
- final AtomicInteger completes = new AtomicInteger(0);
- final int delta = 400;
- final Subject<Integer, Integer> dut = new TimeoutSubject<>(10000, new Observer<Integer>() {
-
- @Override
- public void onNext(final Integer value) {
- nexts.addAndGet(delta);
- }
-
- @Override
- public void onError(final Exception error) {
- fail(error.toString());
- }
-
- @Override
- public void onCompleted() {
- assertEquals(delta, nexts.get());
- completes.incrementAndGet();
- }
- });
-
- final ExecutorService e = Executors.newSingleThreadExecutor();
- e.submit(new Runnable() {
- @Override
- public void run() {
- dut.onNext(delta);
- }
- });
-
- e.shutdown();
- try {
- e.awaitTermination(11000, TimeUnit.MILLISECONDS);
- } catch (final InterruptedException e1) {
- e1.printStackTrace();
- fail(e1.toString());
- }
-
- assertEquals(delta, nexts.get());
- assertEquals(1, completes.get());
- }
-
- @Test
- public void testTimeout() {
- final int timeout = 1;
- final int sleep = 500;
- final AtomicInteger errors = new AtomicInteger(0);
- final Subject<Integer, Integer> dut = new TimeoutSubject<>(timeout, new Observer<Integer>() {
-
- @Override
- public void onNext(final Integer value) {
- fail("Should not get called");
- }
-
- @Override
- public void onError(final Exception error) {
- assertTrue(error instanceof TimeoutException);
- errors.incrementAndGet();
- }
-
- @Override
- public void onCompleted() {
- fail("Should not get called");
- }
- });
-
- try {
- Thread.sleep(sleep);
- } catch (final InterruptedException e) {
- e.printStackTrace();
- fail(e.toString());
- }
- dut.onNext(0xC0FFEE);
-
- assertEquals(1, errors.get());
- }
-}