You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@reef.apache.org by ma...@apache.org on 2016/02/25 23:23:02 UTC

reef git commit: [REEF-1006] Remove TimeoutSubject and tests from Wake

Repository: reef
Updated Branches:
  refs/heads/master 72fd9de12 -> 3056022be


[REEF-1006] Remove TimeoutSubject and tests from Wake

`TimeoutSubject` is deprecated in REEF-535 of Release 0.14.
This change removes TimeoutSubject.java and TimeoutSubjectTest.java in reef-wake.

JIRA:
  [REEF-1006](https://issues.apache.org/jira/browse/REEF-1006)

Pull Request:
  This closes #861


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/3056022b
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/3056022b
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/3056022b

Branch: refs/heads/master
Commit: 3056022be5325cffcd158bc0934bcc72abb2b74b
Parents: 72fd9de
Author: Dongjoon Hyun <do...@apache.org>
Authored: Thu Feb 25 13:19:30 2016 -0800
Committer: Mariia Mykhailova <ma...@apache.org>
Committed: Thu Feb 25 14:21:36 2016 -0800

----------------------------------------------------------------------
 .../reef/wake/rx/impl/TimeoutSubject.java       |  93 ------------
 .../reef/wake/test/rx/TimeoutSubjectTest.java   | 146 -------------------
 2 files changed, 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/3056022b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java
deleted file mode 100644
index 89bba0f..0000000
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.wake.rx.impl;
-
-import org.apache.reef.wake.rx.Observer;
-import org.apache.reef.wake.rx.Subject;
-
-import java.util.concurrent.TimeoutException;
-
-/**
- * A class implementing {@code Subject<T>} with timeout.
- *
- * @param <T> a type of subject
- * @deprecated in 0.14 as unused
- */
-@Deprecated
-public class TimeoutSubject<T> implements Subject<T, T> {
-  private Thread timeBomb;
-  private Observer<T> destination;
-  private boolean finished;
-
-  public TimeoutSubject(final long timeout, final Observer<T> handler) {
-    this.finished = false;
-    this.destination = handler;
-    final TimeoutSubject<T> outer = this;
-    this.timeBomb = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        final boolean finishedCopy;
-        synchronized (outer) {
-          if (!finished) {
-            try {
-              outer.wait(timeout);
-            } catch (final InterruptedException e) {
-              return;
-            }
-          }
-          finishedCopy = finished;
-          finished = true; // lock out the caller from putting event through now
-        }
-        if (!finishedCopy) {
-          destination.onError(new TimeoutException("TimeoutSubject expired"));
-        }
-      }
-    });
-    this.timeBomb.start();
-  }
-
-  @Override
-  public void onNext(final T value) {
-    final boolean wasFinished;
-    synchronized (this) {
-      wasFinished = finished;
-      if (!finished) {
-        this.notify();
-        finished = true;
-      }
-    }
-    if (!wasFinished) {
-      // TODO[JIRA unneeded due to deprecation]: change Subject to specify conversion to T
-      destination.onNext(value);
-      destination.onCompleted();
-    }
-  }
-
-  @Override
-  public void onError(final Exception error) {
-    this.timeBomb.interrupt();
-    destination.onError(error);
-  }
-
-  @Override
-  public void onCompleted() {
-    throw new IllegalStateException("Should not be called directly");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/3056022b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/rx/TimeoutSubjectTest.java
----------------------------------------------------------------------
diff --git a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/rx/TimeoutSubjectTest.java b/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/rx/TimeoutSubjectTest.java
deleted file mode 100644
index c433b81..0000000
--- a/lang/java/reef-wake/wake/src/test/java/org/apache/reef/wake/test/rx/TimeoutSubjectTest.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.wake.test.rx;
-
-import org.apache.reef.wake.rx.Observer;
-import org.apache.reef.wake.rx.Subject;
-import org.apache.reef.wake.rx.impl.TimeoutSubject;
-import org.junit.Test;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests for TimeoutSubject.
- */
-public class TimeoutSubjectTest {
-
-  @Test
-  public void testSuccess() {
-    final AtomicInteger nexts = new AtomicInteger(0);
-    final AtomicInteger completes = new AtomicInteger(0);
-    final int delta = 400;
-    final Subject<Integer, Integer> dut = new TimeoutSubject<>(10000, new Observer<Integer>() {
-
-      @Override
-      public void onNext(final Integer value) {
-        nexts.addAndGet(delta);
-      }
-
-      @Override
-      public void onError(final Exception error) {
-        fail(error.toString());
-      }
-
-      @Override
-      public void onCompleted() {
-        assertEquals(delta, nexts.get());
-        completes.incrementAndGet();
-      }
-    });
-    dut.onNext(delta);
-
-    assertEquals(delta, nexts.get());
-    assertEquals(1, completes.get());
-  }
-
-  @Test
-  public void testDifferentThread() {
-    final AtomicInteger nexts = new AtomicInteger(0);
-    final AtomicInteger completes = new AtomicInteger(0);
-    final int delta = 400;
-    final Subject<Integer, Integer> dut = new TimeoutSubject<>(10000, new Observer<Integer>() {
-
-      @Override
-      public void onNext(final Integer value) {
-        nexts.addAndGet(delta);
-      }
-
-      @Override
-      public void onError(final Exception error) {
-        fail(error.toString());
-      }
-
-      @Override
-      public void onCompleted() {
-        assertEquals(delta, nexts.get());
-        completes.incrementAndGet();
-      }
-    });
-
-    final ExecutorService e = Executors.newSingleThreadExecutor();
-    e.submit(new Runnable() {
-      @Override
-      public void run() {
-        dut.onNext(delta);
-      }
-    });
-
-    e.shutdown();
-    try {
-      e.awaitTermination(11000, TimeUnit.MILLISECONDS);
-    } catch (final InterruptedException e1) {
-      e1.printStackTrace();
-      fail(e1.toString());
-    }
-
-    assertEquals(delta, nexts.get());
-    assertEquals(1, completes.get());
-  }
-
-  @Test
-  public void testTimeout() {
-    final int timeout = 1;
-    final int sleep = 500;
-    final AtomicInteger errors = new AtomicInteger(0);
-    final Subject<Integer, Integer> dut = new TimeoutSubject<>(timeout, new Observer<Integer>() {
-
-      @Override
-      public void onNext(final Integer value) {
-        fail("Should not get called");
-      }
-
-      @Override
-      public void onError(final Exception error) {
-        assertTrue(error instanceof TimeoutException);
-        errors.incrementAndGet();
-      }
-
-      @Override
-      public void onCompleted() {
-        fail("Should not get called");
-      }
-    });
-
-    try {
-      Thread.sleep(sleep);
-    } catch (final InterruptedException e) {
-      e.printStackTrace();
-      fail(e.toString());
-    }
-    dut.onNext(0xC0FFEE);
-
-    assertEquals(1, errors.get());
-  }
-}