You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by ik...@apache.org on 2016/04/21 02:02:39 UTC

[22/52] [abbrv] incubator-omid git commit: Move com.yahoo -> org.apache

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/LeaseManager.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/LeaseManager.java b/tso-server/src/main/java/com/yahoo/omid/tso/LeaseManager.java
deleted file mode 100644
index e6ff7f1..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/LeaseManager.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractScheduledService;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.yahoo.omid.tso.TSOStateManager.TSOState;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.EnsurePath;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Encompasses all the required elements to control the leases required for
- * identifying the master instance when running multiple TSO instances for HA
- * It delegates the initialization of the TSO state and the publication of
- * the instance information when getting the lease to an asynchronous task to
- * continue managing the leases without interruptions.
- */
-class LeaseManager extends AbstractScheduledService implements LeaseManagement {
-
-    private static final Logger LOG = LoggerFactory.getLogger(LeaseManager.class);
-
-    private final CuratorFramework zkClient;
-
-    private final Panicker panicker;
-
-    private final String tsoHostAndPort;
-
-    private final TSOStateManager stateManager;
-    private final ExecutorService tsoStateInitializer = Executors.newSingleThreadExecutor(
-            new ThreadFactoryBuilder()
-                    .setNameFormat("tso-state-initializer")
-                    .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
-                        @Override
-                        public void uncaughtException(Thread t, Throwable e) {
-                            panicker.panic(t + " threw exception", e);
-                        }
-                    })
-                    .build());
-
-
-    private final long leasePeriodInMs;
-    private final TSOChannelHandler tsoChannelHandler;
-    private int leaseNodeVersion;
-    private final AtomicLong endLeaseInMs = new AtomicLong(0L);
-    private final AtomicLong baseTimeInMs = new AtomicLong(0L);
-
-    private final String leasePath;
-    private final String currentTSOPath;
-
-    LeaseManager(String tsoHostAndPort,
-                 TSOChannelHandler tsoChannelHandler,
-                 TSOStateManager stateManager,
-                 long leasePeriodInMs,
-                 String leasePath,
-                 String currentTSOPath,
-                 CuratorFramework zkClient,
-                 Panicker panicker) {
-
-        this.tsoHostAndPort = tsoHostAndPort;
-        this.tsoChannelHandler = tsoChannelHandler;
-        this.stateManager = stateManager;
-        this.leasePeriodInMs = leasePeriodInMs;
-        this.leasePath = leasePath;
-        this.currentTSOPath = currentTSOPath;
-        this.zkClient = zkClient;
-        this.panicker = panicker;
-        LOG.info("LeaseManager {} initialized. Lease period {}ms", toString(), leasePeriodInMs);
-
-    }
-
-    // ----------------------------------------------------------------------------------------------------------------
-    // LeaseManagement implementation
-    // ----------------------------------------------------------------------------------------------------------------
-
-    @Override
-    public void startService() throws LeaseManagementException {
-        createLeaseManagementZNode();
-        createCurrentTSOZNode();
-        startAndWait();
-    }
-
-    @Override
-    public void stopService() throws LeaseManagementException {
-        stopAndWait();
-    }
-
-    @Override
-    public boolean stillInLeasePeriod() {
-        return System.currentTimeMillis() <= getEndLeaseInMs();
-    }
-
-    // ----------------------------------------------------------------------------------------------------------------
-    // End LeaseManagement implementation
-    // ----------------------------------------------------------------------------------------------------------------
-
-    void tryToGetInitialLeasePeriod() throws Exception {
-        baseTimeInMs.set(System.currentTimeMillis());
-        if (canAcquireLease()) {
-            endLeaseInMs.set(baseTimeInMs.get() + leasePeriodInMs);
-            LOG.info("{} got the lease (Master) Ver. {}/End of lease: {}ms", tsoHostAndPort,
-                    leaseNodeVersion, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(endLeaseInMs));
-            tsoStateInitializer.submit(new Runnable() {
-                // TSO State initialization
-                @Override
-                public void run() {
-                    try {
-                        TSOState newTSOState = stateManager.reset();
-                        advertiseTSOServerInfoThroughZK(newTSOState.getEpoch());
-                        tsoChannelHandler.reconnect();
-                    } catch (Exception e) {
-                        Thread t = Thread.currentThread();
-                        t.getUncaughtExceptionHandler().uncaughtException(t, e);
-                    }
-                }
-            });
-        } else {
-            tsoStateInitializer.submit(new Runnable() {
-                // TSO State initialization
-                @Override
-                public void run() {
-                    // In case the TSO was paused close the connection
-                    tsoChannelHandler.closeConnection();
-                }
-            });
-        }
-    }
-
-    void tryToRenewLeasePeriod() throws Exception {
-        baseTimeInMs.set(System.currentTimeMillis());
-        if (canAcquireLease()) {
-            if (System.currentTimeMillis() > getEndLeaseInMs()) {
-                endLeaseInMs.set(0L);
-                LOG.warn("{} expired lease! Releasing lease to start Master re-election", tsoHostAndPort);
-                tsoChannelHandler.closeConnection();
-            } else {
-                endLeaseInMs.set(baseTimeInMs.get() + leasePeriodInMs);
-                LOG.trace("{} renewed lease: Version {}/End of lease at {}ms",
-                        tsoHostAndPort, leaseNodeVersion, endLeaseInMs);
-            }
-        } else {
-            endLeaseInMs.set(0L);
-            LOG.warn("{} lost the lease (Ver. {})! Other instance is now Master",
-                    tsoHostAndPort, leaseNodeVersion);
-            tsoChannelHandler.closeConnection();
-        }
-    }
-
-    private boolean haveLease() {
-        return stillInLeasePeriod();
-    }
-
-    private long getEndLeaseInMs() {
-        return endLeaseInMs.get();
-    }
-
-    private boolean canAcquireLease() throws Exception {
-        try {
-            int previousLeaseNodeVersion = leaseNodeVersion;
-            final byte[] instanceInfo = tsoHostAndPort.getBytes(Charsets.UTF_8);
-            // Try to acquire the lease
-            Stat stat = zkClient.setData().withVersion(previousLeaseNodeVersion)
-                    .forPath(leasePath, instanceInfo);
-            leaseNodeVersion = stat.getVersion();
-            LOG.trace("{} got new lease version {}", tsoHostAndPort, leaseNodeVersion);
-        } catch (KeeperException.BadVersionException e) {
-            return false;
-        }
-        return true;
-    }
-
-    // ----------------------------------------------------------------------------------------------------------------
-    // AbstractScheduledService implementation
-    // ----------------------------------------------------------------------------------------------------------------
-
-    @Override
-    protected void startUp() {
-    }
-
-    @Override
-    protected void shutDown() {
-        try {
-            tsoChannelHandler.close();
-            LOG.info("Channel handler closed");
-        } catch (IOException e) {
-            LOG.error("Error closing TSOChannelHandler", e);
-        }
-    }
-
-    @Override
-    protected void runOneIteration() throws Exception {
-
-        if (!haveLease()) {
-            tryToGetInitialLeasePeriod();
-        } else {
-            tryToRenewLeasePeriod();
-        }
-
-    }
-
-    @Override
-    protected Scheduler scheduler() {
-
-        final long guardLeasePeriodInMs = leasePeriodInMs / 4;
-
-        return new AbstractScheduledService.CustomScheduler() {
-
-            @Override
-            protected Schedule getNextSchedule() throws Exception {
-                if (!haveLease()) {
-                    // Get the current node version...
-                    Stat stat = zkClient.checkExists().forPath(leasePath);
-                    leaseNodeVersion = stat.getVersion();
-                    LOG.trace("{} will try to get lease (with Ver. {}) in {}ms", tsoHostAndPort, leaseNodeVersion,
-                            leasePeriodInMs);
-                    // ...and wait the lease period
-                    return new Schedule(leasePeriodInMs, TimeUnit.MILLISECONDS);
-                } else {
-                    long waitTimeInMs = getEndLeaseInMs() - System.currentTimeMillis() - guardLeasePeriodInMs;
-                    LOG.trace("{} will try to renew lease (with Ver. {}) in {}ms", tsoHostAndPort,
-                            leaseNodeVersion, waitTimeInMs);
-                    return new Schedule(waitTimeInMs, TimeUnit.MILLISECONDS);
-                }
-            }
-        };
-
-    }
-
-    // ----------------------------------------------------------------------------------------------------------------
-    // Helper methods
-    // ----------------------------------------------------------------------------------------------------------------
-
-    @Override
-    public String toString() {
-        return tsoHostAndPort;
-    }
-
-    private void createLeaseManagementZNode() throws LeaseManagementException {
-        try {
-            validateZKPath(leasePath);
-        } catch (Exception e) {
-            throw new LeaseManagementException("Error creating Lease Management ZNode", e);
-        }
-    }
-
-    private void createCurrentTSOZNode() throws LeaseManagementException {
-        try {
-            validateZKPath(currentTSOPath);
-        } catch (Exception e) {
-            throw new LeaseManagementException("Error creating TSO ZNode", e);
-        }
-    }
-
-    private void validateZKPath(String zkPath) throws Exception {
-        EnsurePath path = zkClient.newNamespaceAwareEnsurePath(zkPath);
-        path.ensure(zkClient.getZookeeperClient());
-        Stat stat = zkClient.checkExists().forPath(zkPath);
-        Preconditions.checkNotNull(stat);
-        LOG.info("Path {} ensured", path.getPath());
-    }
-
-    private void advertiseTSOServerInfoThroughZK(long epoch) throws Exception {
-
-        Stat previousTSOZNodeStat = new Stat();
-        byte[] previousTSOInfoAsBytes = zkClient.getData().storingStatIn(previousTSOZNodeStat).forPath(currentTSOPath);
-        if (previousTSOInfoAsBytes != null && !new String(previousTSOInfoAsBytes, Charsets.UTF_8).isEmpty()) {
-            String previousTSOInfo = new String(previousTSOInfoAsBytes, Charsets.UTF_8);
-            String[] previousTSOAndEpochArray = previousTSOInfo.split("#");
-            Preconditions.checkArgument(previousTSOAndEpochArray.length == 2, "Incorrect TSO Info found: ", previousTSOInfo);
-            long oldEpoch = Long.parseLong(previousTSOAndEpochArray[1]);
-            if (oldEpoch > epoch) {
-                throw new LeaseManagementException("Another TSO replica was found " + previousTSOInfo);
-            }
-        }
-        String tsoInfoAsString = tsoHostAndPort + "#" + Long.toString(epoch);
-        byte[] tsoInfoAsBytes = tsoInfoAsString.getBytes(Charsets.UTF_8);
-        zkClient.setData().withVersion(previousTSOZNodeStat.getVersion()).forPath(currentTSOPath, tsoInfoAsBytes);
-        LOG.info("TSO instance {} (Epoch {}) advertised through ZK", tsoHostAndPort, epoch);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/LongCache.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/LongCache.java b/tso-server/src/main/java/com/yahoo/omid/tso/LongCache.java
deleted file mode 100644
index e140d50..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/LongCache.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import java.util.Arrays;
-
-public class LongCache implements Cache {
-
-    public static final long RESET_VALUE = 0L;
-
-    private final long[] cache;
-    private final int size;
-    private final int associativity;
-
-    public LongCache(int size, int associativity) {
-        this.size = size;
-        this.cache = new long[2 * (size + associativity)];
-        this.associativity = associativity;
-    }
-
-    @Override
-    public void reset() {
-        Arrays.fill(cache, RESET_VALUE);
-    }
-
-    /* (non-Javadoc)
-     * @see com.yahoo.omid.tso.Cache#set(long, long)
-     */
-    @Override
-    public long set(long key, long value) {
-        final int index = index(key);
-        int oldestIndex = 0;
-        long oldestValue = Long.MAX_VALUE;
-        for (int i = 0; i < associativity; ++i) {
-            int currIndex = 2 * (index + i);
-            if (cache[currIndex] == key) {
-                oldestValue = 0;
-                oldestIndex = currIndex;
-                break;
-            }
-            if (cache[currIndex + 1] <= oldestValue) {
-                oldestValue = cache[currIndex + 1];
-                oldestIndex = currIndex;
-            }
-        }
-        cache[oldestIndex] = key;
-        cache[oldestIndex + 1] = value;
-        return oldestValue;
-    }
-
-    /* (non-Javadoc)
-     * @see com.yahoo.omid.tso.Cache#get(long)
-     */
-    @Override
-    public long get(long key) {
-        final int index = index(key);
-        for (int i = 0; i < associativity; ++i) {
-            int currIndex = 2 * (index + i);
-            if (cache[currIndex] == key) {
-                return cache[currIndex + 1];
-            }
-        }
-        return 0;
-    }
-
-    private int index(long hash) {
-        return (int) (Math.abs(hash) % size);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/MockPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/MockPanicker.java b/tso-server/src/main/java/com/yahoo/omid/tso/MockPanicker.java
deleted file mode 100644
index aec6141..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/MockPanicker.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MockPanicker implements Panicker {
-    private static final Logger LOG = LoggerFactory.getLogger(MockPanicker.class);
-
-    @Override
-    public void panic(String reason, Throwable cause) {
-        LOG.error("PANICKING: {}", reason, cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/MonitoringContext.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/MonitoringContext.java b/tso-server/src/main/java/com/yahoo/omid/tso/MonitoringContext.java
deleted file mode 100644
index c52e96a..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/MonitoringContext.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Throwables;
-import com.yahoo.omid.metrics.MetricsRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.concurrent.NotThreadSafe;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import static com.yahoo.omid.metrics.MetricsUtils.name;
-
-@NotThreadSafe
-public class MonitoringContext {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MonitoringContext.class);
-
-    private volatile boolean flag;
-    private Map<String, Long> elapsedTimeMsMap = new HashMap<>();
-    private Map<String, Stopwatch> timers = new ConcurrentHashMap<>();
-    private MetricsRegistry metrics;
-
-    public MonitoringContext(MetricsRegistry metrics) {
-        this.metrics = metrics;
-    }
-
-    public void timerStart(String name) {
-        Stopwatch stopwatch = new Stopwatch();
-        stopwatch.start();
-        timers.put(name, stopwatch);
-    }
-
-    public void timerStop(String name) {
-        if (flag) {
-            LOG.warn("timerStop({}) called after publish. Measurement was ignored. {}", name, Throwables.getStackTraceAsString(new Exception()));
-            return;
-        }
-        Stopwatch activeStopwatch = timers.get(name);
-        if (activeStopwatch == null) {
-            throw new IllegalStateException(
-                    String.format("There is no %s timer in the %s monitoring context.", name, this));
-        }
-        activeStopwatch.stop();
-        elapsedTimeMsMap.put(name, activeStopwatch.elapsedTime(TimeUnit.NANOSECONDS));
-        timers.remove(name);
-    }
-
-    public void publish() {
-        flag = true;
-        for (String name : elapsedTimeMsMap.keySet()) {
-            Long durationInNs = elapsedTimeMsMap.get(name);
-            metrics.timer(name("tso", name)).update(durationInNs);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/NetworkInterfaceUtils.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/NetworkInterfaceUtils.java b/tso-server/src/main/java/com/yahoo/omid/tso/NetworkInterfaceUtils.java
deleted file mode 100644
index 7a32f1e..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/NetworkInterfaceUtils.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.google.common.net.HostAndPort;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.Enumeration;
-
-final public class NetworkInterfaceUtils {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NetworkInterfaceUtils.class);
-
-    /**
-     * Returns an <code>InetAddress</code> object encapsulating what is most
-     * likely the machine's LAN IP address.
-     * <p/>
-     * This method is intended for use as a replacement of JDK method
-     * <code>InetAddress.getLocalHost</code>, because that method is ambiguous
-     * on Linux systems. Linux systems enumerate the loopback network
-     * interface the same way as regular LAN network interfaces, but the JDK
-     * <code>InetAddress.getLocalHost</code> method does not specify the
-     * algorithm used to select the address returned under such circumstances,
-     * and will often return the loopback address, which is not valid for
-     * network communication. Details
-     * <a href="http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4665037">here</a>.
-     * <p/>
-     * This method will scan all IP addresses on a particular network interface
-     * specified as parameter on the host machine to determine the IP address
-     * most likely to be the machine's LAN address. If the machine has multiple
-     * IP addresses, this method will prefer a site-local IP address (e.g.
-     * 192.168.x.x or 10.10.x.x, usually IPv4) if the machine has one (and will
-     * return the first site-local address if the machine has more than one),
-     * but if the machine does not hold a site-local address, this method will
-     * return simply the first non-loopback address found (IPv4 or IPv6).
-     * <p/>
-     * If this method cannot find a non-loopback address using this selection
-     * algorithm, it will fall back to calling and returning the result of JDK
-     * method <code>InetAddress.getLocalHost()</code>.
-     * <p/>
-     *
-     * @param ifaceName
-     *             The name of the network interface to extract the IP address
-     *             from
-     * @throws UnknownHostException
-     *             If the LAN address of the machine cannot be found.
-     */
-    static InetAddress getIPAddressFromNetworkInterface(String ifaceName)
-            throws SocketException, UnknownHostException {
-
-        NetworkInterface iface = NetworkInterface.getByName(ifaceName);
-        if (iface == null) {
-            throw new IllegalArgumentException(
-                    "Network interface " + ifaceName + " not found");
-        }
-
-        InetAddress candidateAddress = null;
-        Enumeration<InetAddress> inetAddrs = iface.getInetAddresses();
-        while (inetAddrs.hasMoreElements()) {
-            InetAddress inetAddr = inetAddrs.nextElement();
-            if (!inetAddr.isLoopbackAddress()) {
-                if (inetAddr.isSiteLocalAddress()) {
-                    return inetAddr; // Return non-loopback site-local address
-                } else if (candidateAddress == null) {
-                    // Found non-loopback address, but not necessarily site-local
-                    candidateAddress = inetAddr;
-                }
-            }
-        }
-
-        if (candidateAddress != null) {
-            // Site-local address not found, but found other non-loopback addr
-            // Server might have a non-site-local address assigned to its NIC
-            // (or might be running IPv6 which deprecates "site-local" concept)
-            return candidateAddress;
-        }
-
-        // At this point, we did not find a non-loopback address.
-        // Fall back to returning whatever InetAddress.getLocalHost() returns
-        InetAddress jdkSuppliedAddress = InetAddress.getLocalHost();
-        if (jdkSuppliedAddress == null) {
-            throw new UnknownHostException(
-                    "InetAddress.getLocalHost() unexpectedly returned null.");
-        }
-        return jdkSuppliedAddress;
-    }
-
-    public static String getTSOHostAndPort(TSOServerConfig config) throws SocketException, UnknownHostException {
-
-        // Build TSO host:port string and validate it
-        final String tsoNetIfaceName = config.getNetworkIfaceName();
-        InetAddress addr = getIPAddressFromNetworkInterface(tsoNetIfaceName);
-        final int tsoPort = config.getPort();
-
-        String tsoHostAndPortAsString = "N/A";
-        try {
-            tsoHostAndPortAsString = HostAndPort.fromParts(addr.getHostAddress(), tsoPort).toString();
-        } catch (IllegalArgumentException e) {
-            LOG.error("Cannot parse TSO host:port string {}", tsoHostAndPortAsString);
-            throw e;
-        }
-        return tsoHostAndPortAsString;
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/Panicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/Panicker.java b/tso-server/src/main/java/com/yahoo/omid/tso/Panicker.java
deleted file mode 100644
index 47860ae..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/Panicker.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-public interface Panicker {
-    public void panic(String reason, Throwable cause);
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/PausableTimestampOracle.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/PausableTimestampOracle.java b/tso-server/src/main/java/com/yahoo/omid/tso/PausableTimestampOracle.java
deleted file mode 100644
index 98be866..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/PausableTimestampOracle.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.yahoo.omid.metrics.MetricsRegistry;
-import com.yahoo.omid.timestamp.storage.TimestampStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import java.io.IOException;
-
-public class PausableTimestampOracle extends TimestampOracleImpl {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PausableTimestampOracle.class);
-
-    private volatile boolean tsoPaused = false;
-
-    @Inject
-    public PausableTimestampOracle(MetricsRegistry metrics,
-                                   TimestampStorage tsStorage,
-                                   Panicker panicker) throws IOException {
-        super(metrics, tsStorage, panicker);
-    }
-
-    @Override
-    public long next() throws IOException {
-        while (tsoPaused) {
-            synchronized (this) {
-                try {
-                    this.wait();
-                } catch (InterruptedException e) {
-                    LOG.error("Interrupted whilst paused");
-                    Thread.currentThread().interrupt();
-                }
-            }
-        }
-        return super.next();
-    }
-
-    public synchronized void pause() {
-        tsoPaused = true;
-        this.notifyAll();
-    }
-
-    public synchronized void resume() {
-        tsoPaused = false;
-        this.notifyAll();
-    }
-
-    public boolean isTSOPaused() {
-        return tsoPaused;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessor.java
deleted file mode 100644
index 4aa7b51..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import org.jboss.netty.channel.Channel;
-
-interface PersistenceProcessor {
-    void persistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx);
-
-    void persistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx);
-
-    void persistTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx);
-
-    void persistLowWatermark(long lowWatermark);
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessorImpl.java
deleted file mode 100644
index 2b46b86..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessorImpl.java
+++ /dev/null
@@ -1,431 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.BatchEventProcessor;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
-import com.lmax.disruptor.TimeoutHandler;
-import com.yahoo.omid.committable.CommitTable;
-import com.yahoo.omid.metrics.Histogram;
-import com.yahoo.omid.metrics.Meter;
-import com.yahoo.omid.metrics.MetricsRegistry;
-import com.yahoo.omid.metrics.Timer;
-import org.jboss.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import javax.inject.Named;
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static com.yahoo.omid.metrics.MetricsUtils.name;
-import static com.yahoo.omid.tso.TSOServer.TSO_HOST_AND_PORT_KEY;
-
-class PersistenceProcessorImpl
-        implements EventHandler<PersistenceProcessorImpl.PersistEvent>, PersistenceProcessor, TimeoutHandler {
-
-    private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessor.class);
-
-    private final String tsoHostAndPort;
-    private final LeaseManagement leaseManager;
-    final ReplyProcessor reply;
-    final RetryProcessor retryProc;
-    final CommitTable.Client commitTableClient;
-    final CommitTable.Writer writer;
-    final Panicker panicker;
-    final RingBuffer<PersistEvent> persistRing;
-    final Batch batch;
-    final Timer flushTimer;
-    final Histogram batchSizeHistogram;
-    final Meter timeoutMeter;
-    final int batchPersistTimeoutInMs;
-
-    long lastFlush = System.nanoTime();
-
-    @Inject
-    PersistenceProcessorImpl(TSOServerConfig config,
-                             MetricsRegistry metrics,
-                             @Named(TSO_HOST_AND_PORT_KEY) String tsoHostAndPort,
-                             LeaseManagement leaseManager,
-                             CommitTable commitTable,
-                             ReplyProcessor reply,
-                             RetryProcessor retryProc,
-                             Panicker panicker)
-            throws IOException {
-
-        this(config,
-             metrics,
-             tsoHostAndPort,
-             new Batch(config.getMaxBatchSize()),
-             leaseManager,
-             commitTable,
-             reply,
-             retryProc,
-             panicker);
-
-    }
-
-    @VisibleForTesting
-    PersistenceProcessorImpl(TSOServerConfig config,
-                             MetricsRegistry metrics,
-                             String tsoHostAndPort,
-                             Batch batch,
-                             LeaseManagement leaseManager,
-                             CommitTable commitTable,
-                             ReplyProcessor reply,
-                             RetryProcessor retryProc,
-                             Panicker panicker)
-            throws IOException {
-
-        this.tsoHostAndPort = tsoHostAndPort;
-        this.batch = batch;
-        this.batchPersistTimeoutInMs = config.getBatchPersistTimeoutInMs();
-        this.leaseManager = leaseManager;
-        this.commitTableClient = commitTable.getClient();
-        this.writer = commitTable.getWriter();
-        this.reply = reply;
-        this.retryProc = retryProc;
-        this.panicker = panicker;
-
-        LOG.info("Creating the persist processor with batch size {}, and timeout {}ms",
-                 config.getMaxBatchSize(), batchPersistTimeoutInMs);
-
-        flushTimer = metrics.timer(name("tso", "persist", "flush"));
-        batchSizeHistogram = metrics.histogram(name("tso", "persist", "batchsize"));
-        timeoutMeter = metrics.meter(name("tso", "persist", "timeout"));
-
-        // FIXME consider putting something more like a phased strategy here to avoid
-        // all the syscalls
-        final TimeoutBlockingWaitStrategy timeoutStrategy
-                = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), TimeUnit.MILLISECONDS);
-
-        persistRing = RingBuffer.createSingleProducer(
-                PersistEvent.EVENT_FACTORY, 1 << 20, timeoutStrategy); // 2^20 entries in ringbuffer
-        SequenceBarrier persistSequenceBarrier = persistRing.newBarrier();
-        BatchEventProcessor<PersistEvent> persistProcessor = new BatchEventProcessor<>(
-                persistRing,
-                persistSequenceBarrier,
-                this);
-        persistRing.addGatingSequences(persistProcessor.getSequence());
-        persistProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
-
-        ExecutorService persistExec = Executors.newSingleThreadExecutor(
-                new ThreadFactoryBuilder().setNameFormat("persist-%d").build());
-        persistExec.submit(persistProcessor);
-
-    }
-
-    @Override
-    public void onEvent(PersistEvent event, long sequence, boolean endOfBatch) throws Exception {
-
-        switch (event.getType()) {
-            case COMMIT:
-                event.getMonCtx().timerStart("commitPersistProcessor");
-                // TODO: What happens when the IOException is thrown?
-                writer.addCommittedTransaction(event.getStartTimestamp(), event.getCommitTimestamp());
-                batch.addCommit(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel(),
-                                event.getMonCtx());
-                break;
-            case ABORT:
-                sendAbortOrIdentifyFalsePositive(event.getStartTimestamp(), event.isRetry(), event.getChannel(),
-                                                 event.getMonCtx());
-                break;
-            case TIMESTAMP:
-                event.getMonCtx().timerStart("timestampPersistProcessor");
-                batch.addTimestamp(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
-                break;
-        }
-        if (batch.isFull() || endOfBatch) {
-            maybeFlushBatch();
-        }
-
-    }
-
-    private void sendAbortOrIdentifyFalsePositive(long startTimestamp, boolean isRetry, Channel channel,
-                                                  MonitoringContext monCtx) {
-
-        if (!isRetry) {
-            reply.abortResponse(startTimestamp, channel, monCtx);
-            return;
-        }
-
-        // If is a retry, we must check if it is a already committed request abort.
-        // This can happen because a client could have missed the reply, so it
-        // retried the request after a timeout. So we added to the batch and when
-        // it's flushed we'll add events to the retry processor in order to check
-        // for false positive aborts. It needs to be done after the flush in case
-        // the commit has occurred but it hasn't been persisted yet.
-        batch.addUndecidedRetriedRequest(startTimestamp, channel, monCtx);
-    }
-
-    // no event has been received in the timeout period
-    @Override
-    public void onTimeout(final long sequence) {
-        maybeFlushBatch();
-    }
-
-    /**
-     * Flush the current batch if it's full, or the timeout has been elapsed since the last flush.
-     */
-    private void maybeFlushBatch() {
-        if (batch.isFull()) {
-            flush();
-        } else if ((System.nanoTime() - lastFlush) > TimeUnit.MILLISECONDS.toNanos(batchPersistTimeoutInMs)) {
-            timeoutMeter.mark();
-            flush();
-        }
-    }
-
-    @VisibleForTesting
-    synchronized void flush() {
-        lastFlush = System.nanoTime();
-
-        boolean areWeStillMaster = true;
-        if (!leaseManager.stillInLeasePeriod()) {
-            // The master TSO replica has changed, so we must inform the
-            // clients about it when sending the replies and avoid flushing
-            // the current batch of TXs
-            areWeStillMaster = false;
-            // We need also to clear the data in the buffer
-            writer.clearWriteBuffer();
-            LOG.trace("Replica {} lost mastership before flushing data", tsoHostAndPort);
-        } else {
-            try {
-                writer.flush();
-            } catch (IOException e) {
-                panicker.panic("Error persisting commit batch", e.getCause());
-            }
-            batchSizeHistogram.update(batch.getNumEvents());
-            if (!leaseManager.stillInLeasePeriod()) {
-                // If after flushing this TSO server is not the master
-                // replica we need inform the client about it
-                areWeStillMaster = false;
-                LOG.warn("Replica {} lost mastership after flushing data", tsoHostAndPort);
-            }
-        }
-        flushTimer.update((System.nanoTime() - lastFlush));
-        batch.sendRepliesAndReset(reply, retryProc, areWeStillMaster);
-
-    }
-
-    @Override
-    public void persistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
-        long seq = persistRing.next();
-        PersistEvent e = persistRing.get(seq);
-        PersistEvent.makePersistCommit(e, startTimestamp, commitTimestamp, c, monCtx);
-        persistRing.publish(seq);
-    }
-
-    @Override
-    public void persistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx) {
-        long seq = persistRing.next();
-        PersistEvent e = persistRing.get(seq);
-        PersistEvent.makePersistAbort(e, startTimestamp, isRetry, c, monCtx);
-        persistRing.publish(seq);
-    }
-
-    @Override
-    public void persistTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) {
-        long seq = persistRing.next();
-        PersistEvent e = persistRing.get(seq);
-        PersistEvent.makePersistTimestamp(e, startTimestamp, c, monCtx);
-        persistRing.publish(seq);
-    }
-
-    @Override
-    public void persistLowWatermark(long lowWatermark) {
-        try {
-            writer.updateLowWatermark(lowWatermark);
-        } catch (IOException e) {
-            LOG.error("Should not be thrown");
-        }
-    }
-
-    public static class Batch {
-
-        final PersistEvent[] events;
-        final int maxBatchSize;
-        int numEvents;
-
-        Batch(int maxBatchSize) {
-            assert (maxBatchSize > 0);
-            this.maxBatchSize = maxBatchSize;
-            events = new PersistEvent[maxBatchSize];
-            numEvents = 0;
-            for (int i = 0; i < maxBatchSize; i++) {
-                events[i] = new PersistEvent();
-            }
-        }
-
-        boolean isFull() {
-            assert (numEvents <= maxBatchSize);
-            return numEvents == maxBatchSize;
-        }
-
-        int getNumEvents() {
-            return numEvents;
-        }
-
-        void addCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
-            if (isFull()) {
-                throw new IllegalStateException("batch full");
-            }
-            int index = numEvents++;
-            PersistEvent e = events[index];
-            PersistEvent.makePersistCommit(e, startTimestamp, commitTimestamp, c, monCtx);
-        }
-
-        void addUndecidedRetriedRequest(long startTimestamp, Channel c, MonitoringContext monCtx) {
-            if (isFull()) {
-                throw new IllegalStateException("batch full");
-            }
-            int index = numEvents++;
-            PersistEvent e = events[index];
-            // We mark the event as an ABORT retry to identify the events to send
-            // to the retry processor
-            PersistEvent.makePersistAbort(e, startTimestamp, true, c, monCtx);
-        }
-
-        void addTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) {
-            if (isFull()) {
-                throw new IllegalStateException("batch full");
-            }
-            int index = numEvents++;
-            PersistEvent e = events[index];
-            PersistEvent.makePersistTimestamp(e, startTimestamp, c, monCtx);
-        }
-
-        void sendRepliesAndReset(ReplyProcessor reply, RetryProcessor retryProc, boolean isTSOInstanceMaster) {
-            for (int i = 0; i < numEvents; i++) {
-                PersistEvent e = events[i];
-                switch (e.getType()) {
-                    case TIMESTAMP:
-                        e.getMonCtx().timerStop("timestampPersistProcessor");
-                        reply.timestampResponse(e.getStartTimestamp(), e.getChannel(), e.getMonCtx());
-                        break;
-                    case COMMIT:
-                        e.getMonCtx().timerStop("commitPersistProcessor");
-                        if (isTSOInstanceMaster) {
-                            reply.commitResponse(false, e.getStartTimestamp(), e.getCommitTimestamp(), e.getChannel(),
-                                                 e.getMonCtx());
-                        } else {
-                            // The client will need to perform heuristic actions to determine the output
-                            reply.commitResponse(true, e.getStartTimestamp(), e.getCommitTimestamp(), e.getChannel(),
-                                                 e.getMonCtx());
-                        }
-                        break;
-                    case ABORT:
-                        if (e.isRetry()) {
-                            retryProc.disambiguateRetryRequestHeuristically(e.getStartTimestamp(), e.getChannel(),
-                                                                            e.getMonCtx());
-                        } else {
-                            LOG.error("We should not be receiving non-retried aborted requests in here");
-                        }
-                        break;
-                    default:
-                        LOG.error("We should receive only COMMIT or ABORT event types. Received {}", e.getType());
-                        break;
-                }
-            }
-            numEvents = 0;
-        }
-
-    }
-
-    public final static class PersistEvent {
-
-        private MonitoringContext monCtx;
-
-        enum Type {
-            TIMESTAMP, COMMIT, ABORT
-        }
-
-        private Type type = null;
-        private Channel channel = null;
-
-        private boolean isRetry = false;
-        private long startTimestamp = 0;
-        private long commitTimestamp = 0;
-
-        static void makePersistCommit(PersistEvent e, long startTimestamp, long commitTimestamp, Channel c,
-                                      MonitoringContext monCtx) {
-            e.type = Type.COMMIT;
-            e.startTimestamp = startTimestamp;
-            e.commitTimestamp = commitTimestamp;
-            e.channel = c;
-            e.monCtx = monCtx;
-        }
-
-        static void makePersistAbort(PersistEvent e, long startTimestamp, boolean isRetry, Channel c,
-                                     MonitoringContext monCtx) {
-            e.type = Type.ABORT;
-            e.startTimestamp = startTimestamp;
-            e.isRetry = isRetry;
-            e.channel = c;
-            e.monCtx = monCtx;
-        }
-
-        static void makePersistTimestamp(PersistEvent e, long startTimestamp, Channel c, MonitoringContext monCtx) {
-            e.type = Type.TIMESTAMP;
-            e.startTimestamp = startTimestamp;
-            e.channel = c;
-            e.monCtx = monCtx;
-        }
-
-        MonitoringContext getMonCtx() {
-            return monCtx;
-        }
-
-        Type getType() {
-            return type;
-        }
-
-        Channel getChannel() {
-            return channel;
-        }
-
-        boolean isRetry() {
-            return isRetry;
-        }
-
-        long getStartTimestamp() {
-            return startTimestamp;
-        }
-
-        long getCommitTimestamp() {
-            return commitTimestamp;
-        }
-
-        public final static EventFactory<PersistEvent> EVENT_FACTORY = new EventFactory<PersistEvent>() {
-            @Override
-            public PersistEvent newInstance() {
-                return new PersistEvent();
-            }
-        };
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessor.java
deleted file mode 100644
index f0788ed..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import org.jboss.netty.channel.Channel;
-
-interface ReplyProcessor {
-    /**
-     * Informs the client about the outcome of the Tx it was trying to
-     * commit. If the heuristic decision flat is enabled, the client
-     * will need to do additional actions for learning the final outcome.
-     *
-     * @param makeHeuristicDecision
-     *            informs about whether heuristic actions are needed or not
-     * @param startTimestamp
-     *            the start timestamp of the transaction (a.k.a. tx id)
-     * @param commitTimestamp
-     *            the commit timestamp of the transaction
-     * @param channel
-     *            the communication channed with the client
-     */
-    void commitResponse(boolean makeHeuristicDecision, long startTimestamp, long commitTimestamp, Channel channel, MonitoringContext monCtx);
-
-    void abortResponse(long startTimestamp, Channel c, MonitoringContext monCtx);
-
-    void timestampResponse(long startTimestamp, Channel c, MonitoringContext monCtx);
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessorImpl.java
deleted file mode 100644
index a69700d..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessorImpl.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.BatchEventProcessor;
-import com.lmax.disruptor.BusySpinWaitStrategy;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SequenceBarrier;
-import com.yahoo.omid.metrics.Meter;
-import com.yahoo.omid.metrics.MetricsRegistry;
-import com.yahoo.omid.proto.TSOProto;
-import org.jboss.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static com.yahoo.omid.metrics.MetricsUtils.name;
-
-class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyEvent>, ReplyProcessor {
-
-    private static final Logger LOG = LoggerFactory.getLogger(ReplyProcessorImpl.class);
-
-    final RingBuffer<ReplyEvent> replyRing;
-    final Meter abortMeter;
-    final Meter commitMeter;
-    final Meter timestampMeter;
-
-    @Inject
-    ReplyProcessorImpl(MetricsRegistry metrics, Panicker panicker) {
-        replyRing = RingBuffer.<ReplyEvent>createMultiProducer(ReplyEvent.EVENT_FACTORY, 1 << 12,
-                new BusySpinWaitStrategy());
-        SequenceBarrier replySequenceBarrier = replyRing.newBarrier();
-        BatchEventProcessor<ReplyEvent> replyProcessor = new BatchEventProcessor<ReplyEvent>(
-                replyRing, replySequenceBarrier, this);
-        replyProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
-
-        replyRing.addGatingSequences(replyProcessor.getSequence());
-
-        ExecutorService replyExec = Executors.newSingleThreadExecutor(
-                new ThreadFactoryBuilder().setNameFormat("reply-%d").build());
-        replyExec.submit(replyProcessor);
-
-        abortMeter = metrics.meter(name("tso", "aborts"));
-        commitMeter = metrics.meter(name("tso", "commits"));
-        timestampMeter = metrics.meter(name("tso", "timestampAllocation"));
-    }
-
-    public void onEvent(ReplyEvent event, long sequence, boolean endOfBatch) throws Exception {
-        String name = null;
-        try {
-            switch (event.getType()) {
-                case COMMIT:
-                    name = "commitReplyProcessor";
-                    event.getMonCtx().timerStart(name);
-                    handleCommitResponse(false, event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel());
-                    break;
-                case HEURISTIC_COMMIT:
-                    name = "commitReplyProcessor";
-                    event.getMonCtx().timerStart(name);
-                    handleCommitResponse(true, event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel());
-                    break;
-                case ABORT:
-                    name = "abortReplyProcessor";
-                    event.getMonCtx().timerStart(name);
-                    handleAbortResponse(event.getStartTimestamp(), event.getChannel());
-                    break;
-                case TIMESTAMP:
-                    name = "timestampReplyProcessor";
-                    event.getMonCtx().timerStart(name);
-                    handleTimestampResponse(event.getStartTimestamp(), event.getChannel());
-                    break;
-                default:
-                    LOG.error("Unknown event {}", event.getType());
-                    break;
-            }
-        } finally {
-            if (name != null) {
-                event.getMonCtx().timerStop(name);
-            }
-        }
-        event.getMonCtx().publish();
-    }
-
-    @Override
-    public void commitResponse(boolean makeHeuristicDecision, long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
-        long seq = replyRing.next();
-        ReplyEvent e = replyRing.get(seq);
-        ReplyEvent.makeCommitResponse(makeHeuristicDecision, e, startTimestamp, commitTimestamp, c, monCtx);
-        replyRing.publish(seq);
-    }
-
-    @Override
-    public void abortResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
-        long seq = replyRing.next();
-        ReplyEvent e = replyRing.get(seq);
-        ReplyEvent.makeAbortResponse(e, startTimestamp, c, monCtx);
-        replyRing.publish(seq);
-    }
-
-    @Override
-    public void timestampResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
-        long seq = replyRing.next();
-        ReplyEvent e = replyRing.get(seq);
-        ReplyEvent.makeTimestampReponse(e, startTimestamp, c, monCtx);
-        replyRing.publish(seq);
-    }
-
-    void handleCommitResponse(boolean makeHeuristicDecision, long startTimestamp, long commitTimestamp, Channel c) {
-        TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
-        TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
-        if (makeHeuristicDecision) { // If the commit is ambiguous is due to a new master TSO
-            commitBuilder.setMakeHeuristicDecision(true);
-        }
-        commitBuilder.setAborted(false)
-                .setStartTimestamp(startTimestamp)
-                .setCommitTimestamp(commitTimestamp);
-        builder.setCommitResponse(commitBuilder.build());
-        c.write(builder.build());
-
-        commitMeter.mark();
-    }
-
-    void handleAbortResponse(long startTimestamp, Channel c) {
-        TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
-        TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
-        commitBuilder.setAborted(true)
-                .setStartTimestamp(startTimestamp);
-        builder.setCommitResponse(commitBuilder.build());
-        c.write(builder.build());
-
-        abortMeter.mark();
-    }
-
-    void handleTimestampResponse(long startTimestamp, Channel c) {
-        TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
-        TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
-        respBuilder.setStartTimestamp(startTimestamp);
-        builder.setTimestampResponse(respBuilder.build());
-        c.write(builder.build());
-
-        timestampMeter.mark();
-    }
-
-    public final static class ReplyEvent {
-
-        enum Type {
-            TIMESTAMP, COMMIT, HEURISTIC_COMMIT, ABORT
-        }
-
-        private Type type = null;
-        private Channel channel = null;
-
-        private long startTimestamp = 0;
-        private long commitTimestamp = 0;
-        private MonitoringContext monCtx;
-
-        Type getType() {
-            return type;
-        }
-
-        Channel getChannel() {
-            return channel;
-        }
-
-        long getStartTimestamp() {
-            return startTimestamp;
-        }
-
-        long getCommitTimestamp() {
-            return commitTimestamp;
-        }
-
-        MonitoringContext getMonCtx() {
-            return monCtx;
-        }
-
-        static void makeTimestampReponse(ReplyEvent e, long startTimestamp, Channel c, MonitoringContext monCtx) {
-            e.type = Type.TIMESTAMP;
-            e.startTimestamp = startTimestamp;
-            e.channel = c;
-            e.monCtx = monCtx;
-        }
-
-        static void makeCommitResponse(boolean makeHeuristicDecision, ReplyEvent e, long startTimestamp,
-                                       long commitTimestamp, Channel c, MonitoringContext monCtx) {
-
-            if (makeHeuristicDecision) {
-                e.type = Type.HEURISTIC_COMMIT;
-            } else {
-                e.type = Type.COMMIT;
-            }
-            e.startTimestamp = startTimestamp;
-            e.commitTimestamp = commitTimestamp;
-            e.channel = c;
-            e.monCtx = monCtx;
-        }
-
-        static void makeAbortResponse(ReplyEvent e, long startTimestamp, Channel c, MonitoringContext monCtx) {
-            e.type = Type.ABORT;
-            e.startTimestamp = startTimestamp;
-            e.channel = c;
-            e.monCtx = monCtx;
-        }
-
-        public final static EventFactory<ReplyEvent> EVENT_FACTORY = new EventFactory<ReplyEvent>() {
-            @Override
-            public ReplyEvent newInstance() {
-                return new ReplyEvent();
-            }
-        };
-
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessor.java b/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessor.java
deleted file mode 100644
index 2870cb7..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessor.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import org.jboss.netty.channel.Channel;
-
-import java.util.Collection;
-
-// NOTE: public is required explicitly in the interface definition for Guice injection
-public interface RequestProcessor extends TSOStateManager.StateObserver {
-
-    void timestampRequest(Channel c, MonitoringContext monCtx);
-
-    void commitRequest(long startTimestamp, Collection<Long> writeSet, boolean isRetry, Channel c, MonitoringContext monCtx);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessorImpl.java
deleted file mode 100644
index 6194b2e..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessorImpl.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.BatchEventProcessor;
-import com.lmax.disruptor.BusySpinWaitStrategy;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SequenceBarrier;
-import com.yahoo.omid.metrics.MetricsRegistry;
-import com.yahoo.omid.tso.TSOStateManager.TSOState;
-import org.jboss.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-public class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor {
-
-    private static final Logger LOG = LoggerFactory.getLogger(RequestProcessorImpl.class);
-
-    private final TimestampOracle timestampOracle;
-    public final CommitHashMap hashmap;
-    private final MetricsRegistry metrics;
-    private final PersistenceProcessor persistProc;
-    private final RingBuffer<RequestEvent> requestRing;
-    private long lowWatermark = -1L;
-    private long epoch = -1L;
-
-    @Inject
-    RequestProcessorImpl(TSOServerConfig config,
-                         MetricsRegistry metrics,
-                         TimestampOracle timestampOracle,
-                         PersistenceProcessor persistProc,
-                         Panicker panicker) throws IOException {
-
-        this.metrics = metrics;
-
-        this.persistProc = persistProc;
-        this.timestampOracle = timestampOracle;
-
-        this.hashmap = new CommitHashMap(config.getMaxItems());
-
-        // Set up the disruptor thread
-        requestRing = RingBuffer.<RequestEvent>createMultiProducer(RequestEvent.EVENT_FACTORY, 1 << 12,
-                new BusySpinWaitStrategy());
-        SequenceBarrier requestSequenceBarrier = requestRing.newBarrier();
-        BatchEventProcessor<RequestEvent> requestProcessor =
-                new BatchEventProcessor<RequestEvent>(requestRing,
-                        requestSequenceBarrier,
-                        this);
-        requestRing.addGatingSequences(requestProcessor.getSequence());
-        requestProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
-
-        ExecutorService requestExec = Executors.newSingleThreadExecutor(
-                new ThreadFactoryBuilder().setNameFormat("request-%d").build());
-        // Each processor runs on a separate thread
-        requestExec.submit(requestProcessor);
-
-    }
-
-    /**
-     * This should be called when the TSO gets initialized or gets leadership
-     */
-    @Override
-    public void update(TSOState state) throws IOException {
-        LOG.info("Reseting RequestProcessor...");
-        this.lowWatermark = state.getLowWatermark();
-        persistProc.persistLowWatermark(lowWatermark);
-        this.epoch = state.getEpoch();
-        hashmap.reset();
-        LOG.info("RequestProcessor initialized with LWMs {} and Epoch {}", lowWatermark, epoch);
-    }
-
-    @Override
-    public void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
-        String name = null;
-        try {
-            if (event.getType() == RequestEvent.Type.TIMESTAMP) {
-                name = "timestampReqProcessor";
-                event.getMonCtx().timerStart(name);
-                handleTimestamp(event);
-            } else if (event.getType() == RequestEvent.Type.COMMIT) {
-                name = "commitReqProcessor";
-                event.getMonCtx().timerStart(name);
-                handleCommit(event);
-            }
-        } finally {
-            if (name != null) {
-                event.getMonCtx().timerStop(name);
-            }
-        }
-
-    }
-
-    @Override
-    public void timestampRequest(Channel c, MonitoringContext monCtx) {
-        long seq = requestRing.next();
-        RequestEvent e = requestRing.get(seq);
-        RequestEvent.makeTimestampRequest(e, c, monCtx);
-        requestRing.publish(seq);
-    }
-
-    @Override
-    public void commitRequest(long startTimestamp, Collection<Long> writeSet, boolean isRetry, Channel c, MonitoringContext monCtx) {
-        long seq = requestRing.next();
-        RequestEvent e = requestRing.get(seq);
-        RequestEvent.makeCommitRequest(e, startTimestamp, monCtx, writeSet, isRetry, c);
-        requestRing.publish(seq);
-    }
-
-    public void handleTimestamp(RequestEvent requestEvent) {
-        long timestamp;
-
-        try {
-            timestamp = timestampOracle.next();
-        } catch (IOException e) {
-            LOG.error("Error getting timestamp", e);
-            return;
-        }
-
-        persistProc.persistTimestamp(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
-    }
-
-    public long handleCommit(RequestEvent event) {
-        long startTimestamp = event.getStartTimestamp();
-        Iterable<Long> writeSet = event.writeSet();
-        boolean isRetry = event.isRetry();
-        Channel c = event.getChannel();
-
-        boolean committed = false;
-        long commitTimestamp = 0L;
-
-        int numCellsInWriteset = 0;
-        // 0. check if it should abort
-        if (startTimestamp <= lowWatermark) {
-            committed = false;
-        } else {
-            // 1. check the write-write conflicts
-            committed = true;
-            for (long cellId : writeSet) {
-                long value = hashmap.getLatestWriteForCell(cellId);
-                if (value != 0 && value >= startTimestamp) {
-                    committed = false;
-                    break;
-                }
-                numCellsInWriteset++;
-            }
-        }
-
-        if (committed) {
-            // 2. commit
-            try {
-                commitTimestamp = timestampOracle.next();
-
-                if (numCellsInWriteset > 0) {
-                    long newLowWatermark = lowWatermark;
-
-                    for (long r : writeSet) {
-                        long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
-                        newLowWatermark = Math.max(removed, newLowWatermark);
-                    }
-
-                    if (newLowWatermark != lowWatermark) {
-                        LOG.trace("Setting new low Watermark to {}", newLowWatermark);
-                        lowWatermark = newLowWatermark;
-                        persistProc.persistLowWatermark(newLowWatermark);
-                    }
-                }
-                persistProc.persistCommit(startTimestamp, commitTimestamp, c, event.getMonCtx());
-            } catch (IOException e) {
-                LOG.error("Error committing", e);
-            }
-        } else { // add it to the aborted list
-            persistProc.persistAbort(startTimestamp, isRetry, c, event.getMonCtx());
-        }
-
-        return commitTimestamp;
-    }
-
-    final static class RequestEvent implements Iterable<Long> {
-
-        enum Type {
-            TIMESTAMP, COMMIT
-        }
-
-        ;
-
-        private Type type = null;
-        private Channel channel = null;
-
-        private boolean isRetry = false;
-        private long startTimestamp = 0;
-        private MonitoringContext monCtx;
-        private long numCells = 0;
-
-        private static final int MAX_INLINE = 40;
-        private Long writeSet[] = new Long[MAX_INLINE];
-        private Collection<Long> writeSetAsCollection = null; // for the case where there's more than MAX_INLINE
-
-        static void makeTimestampRequest(RequestEvent e, Channel c, MonitoringContext monCtx) {
-            e.type = Type.TIMESTAMP;
-            e.channel = c;
-            e.monCtx = monCtx;
-        }
-
-        static void makeCommitRequest(RequestEvent e,
-                                      long startTimestamp, MonitoringContext monCtx, Collection<Long> writeSet,
-                                      boolean isRetry, Channel c) {
-            e.monCtx = monCtx;
-            e.type = Type.COMMIT;
-            e.channel = c;
-            e.startTimestamp = startTimestamp;
-            e.isRetry = isRetry;
-            if (writeSet.size() > MAX_INLINE) {
-                e.numCells = writeSet.size();
-                e.writeSetAsCollection = writeSet;
-            } else {
-                e.writeSetAsCollection = null;
-                e.numCells = writeSet.size();
-                int i = 0;
-                for (Long cellId : writeSet) {
-                    e.writeSet[i] = cellId;
-                    i++;
-                }
-            }
-        }
-
-        MonitoringContext getMonCtx() {
-            return monCtx;
-        }
-
-        Type getType() {
-            return type;
-        }
-
-        long getStartTimestamp() {
-            return startTimestamp;
-        }
-
-        Channel getChannel() {
-            return channel;
-        }
-
-        @Override
-        public Iterator<Long> iterator() {
-            if (writeSetAsCollection != null) {
-                return writeSetAsCollection.iterator();
-            }
-            return new Iterator<Long>() {
-                int i = 0;
-
-                @Override
-                public boolean hasNext() {
-                    return i < numCells;
-                }
-
-                @Override
-                public Long next() {
-                    return writeSet[i++];
-                }
-
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
-                }
-            };
-        }
-
-        Iterable<Long> writeSet() {
-            return this;
-        }
-
-        boolean isRetry() {
-            return isRetry;
-        }
-
-        public final static EventFactory<RequestEvent> EVENT_FACTORY
-                = new EventFactory<RequestEvent>() {
-            @Override
-            public RequestEvent newInstance() {
-                return new RequestEvent();
-            }
-        };
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessor.java b/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessor.java
deleted file mode 100644
index 70309b9..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessor.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import org.jboss.netty.channel.Channel;
-
-interface RetryProcessor {
-    void disambiguateRetryRequestHeuristically(long startTimestamp, Channel c, MonitoringContext monCtx);
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessorImpl.java
deleted file mode 100644
index 13fbe2d..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessorImpl.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.BatchEventProcessor;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.YieldingWaitStrategy;
-import com.yahoo.omid.committable.CommitTable;
-import com.yahoo.omid.committable.CommitTable.CommitTimestamp;
-import com.yahoo.omid.metrics.Meter;
-import com.yahoo.omid.metrics.MetricsRegistry;
-import org.jboss.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static com.yahoo.omid.metrics.MetricsUtils.name;
-
-/**
- * Manages the retry requests that clients can send when they did  not received the response in the specified timeout
- */
-class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>, RetryProcessor {
-
-    private static final Logger LOG = LoggerFactory.getLogger(RetryProcessor.class);
-
-    // Disruptor chain stuff
-    final ReplyProcessor replyProc;
-    final RingBuffer<RetryEvent> retryRing;
-
-    final CommitTable.Client commitTableClient;
-    final CommitTable.Writer writer;
-
-    // Metrics
-    final Meter retriesMeter;
-
-    @Inject
-    RetryProcessorImpl(MetricsRegistry metrics, CommitTable commitTable, ReplyProcessor replyProc, Panicker panicker)
-            throws IOException
-    {
-
-        this.commitTableClient = commitTable.getClient();
-        this.writer = commitTable.getWriter();
-        this.replyProc = replyProc;
-
-        WaitStrategy strategy = new YieldingWaitStrategy();
-
-        retryRing = RingBuffer.createSingleProducer(RetryEvent.EVENT_FACTORY, 1 << 12, strategy);
-        SequenceBarrier retrySeqBarrier = retryRing.newBarrier();
-        BatchEventProcessor<RetryEvent> retryProcessor = new BatchEventProcessor<>(retryRing, retrySeqBarrier, this);
-        retryProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
-
-        retryRing.addGatingSequences(retryProcessor.getSequence());
-
-        ExecutorService retryExec = Executors.newSingleThreadExecutor(
-                new ThreadFactoryBuilder().setNameFormat("retry-%d").build());
-        retryExec.submit(retryProcessor);
-
-        // Metrics
-        retriesMeter = metrics.meter(name("tso", "retries"));
-    }
-
-    @Override
-    public void onEvent(final RetryEvent event, final long sequence, final boolean endOfBatch)
-            throws Exception {
-
-        switch (event.getType()) {
-            case COMMIT:
-                // TODO: What happens when the IOException is thrown?
-                handleCommitRetry(event);
-                break;
-            default:
-                assert (false);
-                break;
-        }
-
-    }
-
-    private void handleCommitRetry(RetryEvent event) throws InterruptedException, ExecutionException {
-
-        long startTimestamp = event.getStartTimestamp();
-
-        try {
-            Optional<CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(startTimestamp).get();
-            if (commitTimestamp.isPresent()) {
-                if (commitTimestamp.get().isValid()) {
-                    LOG.trace("Valid commit TS found in Commit Table");
-                    replyProc.commitResponse(false, startTimestamp, commitTimestamp.get().getValue(),
-                            event.getChannel(), event.getMonCtx());
-                } else {
-                    LOG.trace("Invalid commit TS found in Commit Table");
-                    replyProc.abortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
-                }
-            } else {
-                LOG.trace("No commit TS found in Commit Table");
-                replyProc.abortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
-            }
-        } catch (InterruptedException e) {
-            LOG.error("Interrupted reading from commit table");
-            Thread.currentThread().interrupt();
-        } catch (ExecutionException e) {
-            LOG.error("Error reading from commit table", e);
-        }
-
-        retriesMeter.mark();
-    }
-
-    @Override
-    public void disambiguateRetryRequestHeuristically(long startTimestamp, Channel c, MonitoringContext monCtx) {
-        long seq = retryRing.next();
-        RetryEvent e = retryRing.get(seq);
-        RetryEvent.makeCommitRetry(e, startTimestamp, c, monCtx);
-        retryRing.publish(seq);
-    }
-
-    public final static class RetryEvent {
-
-        enum Type {
-            COMMIT
-        }
-
-        private Type type = null;
-
-        private long startTimestamp = 0;
-        private Channel channel = null;
-        private MonitoringContext monCtx;
-
-        static void makeCommitRetry(RetryEvent e, long startTimestamp, Channel c, MonitoringContext monCtx) {
-            e.monCtx = monCtx;
-            e.type = Type.COMMIT;
-            e.startTimestamp = startTimestamp;
-            e.channel = c;
-        }
-
-        MonitoringContext getMonCtx() {
-            return monCtx;
-        }
-
-        Type getType() {
-            return type;
-        }
-
-        Channel getChannel() {
-            return channel;
-        }
-
-        long getStartTimestamp() {
-            return startTimestamp;
-        }
-
-        public final static EventFactory<RetryEvent> EVENT_FACTORY
-                = new EventFactory<RetryEvent>() {
-            @Override
-            public RetryEvent newInstance() {
-                return new RetryEvent();
-            }
-        };
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/SystemExitPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/SystemExitPanicker.java b/tso-server/src/main/java/com/yahoo/omid/tso/SystemExitPanicker.java
deleted file mode 100644
index 7cfd77a..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/SystemExitPanicker.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SystemExitPanicker implements Panicker {
-    private static final Logger LOG = LoggerFactory.getLogger(SystemExitPanicker.class);
-    private static final int PANIC_EXIT_CODE = 123;
-
-    @Override
-    public void panic(String reason, Throwable cause) {
-        LOG.error("PANICKING: {}", reason, cause);
-        System.exit(PANIC_EXIT_CODE);
-    }
-}