You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by ik...@apache.org on 2016/04/21 02:02:39 UTC
[22/52] [abbrv] incubator-omid git commit: Move com.yahoo ->
org.apache
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/LeaseManager.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/LeaseManager.java b/tso-server/src/main/java/com/yahoo/omid/tso/LeaseManager.java
deleted file mode 100644
index e6ff7f1..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/LeaseManager.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractScheduledService;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.yahoo.omid.tso.TSOStateManager.TSOState;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.EnsurePath;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Encompasses all the required elements to control the leases required for
- * identifying the master instance when running multiple TSO instances for HA
- * It delegates the initialization of the TSO state and the publication of
- * the instance information when getting the lease to an asynchronous task to
- * continue managing the leases without interruptions.
- */
-class LeaseManager extends AbstractScheduledService implements LeaseManagement {
-
- private static final Logger LOG = LoggerFactory.getLogger(LeaseManager.class);
-
- private final CuratorFramework zkClient;
-
- private final Panicker panicker;
-
- private final String tsoHostAndPort;
-
- private final TSOStateManager stateManager;
- private final ExecutorService tsoStateInitializer = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder()
- .setNameFormat("tso-state-initializer")
- .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- panicker.panic(t + " threw exception", e);
- }
- })
- .build());
-
-
- private final long leasePeriodInMs;
- private final TSOChannelHandler tsoChannelHandler;
- private int leaseNodeVersion;
- private final AtomicLong endLeaseInMs = new AtomicLong(0L);
- private final AtomicLong baseTimeInMs = new AtomicLong(0L);
-
- private final String leasePath;
- private final String currentTSOPath;
-
- LeaseManager(String tsoHostAndPort,
- TSOChannelHandler tsoChannelHandler,
- TSOStateManager stateManager,
- long leasePeriodInMs,
- String leasePath,
- String currentTSOPath,
- CuratorFramework zkClient,
- Panicker panicker) {
-
- this.tsoHostAndPort = tsoHostAndPort;
- this.tsoChannelHandler = tsoChannelHandler;
- this.stateManager = stateManager;
- this.leasePeriodInMs = leasePeriodInMs;
- this.leasePath = leasePath;
- this.currentTSOPath = currentTSOPath;
- this.zkClient = zkClient;
- this.panicker = panicker;
- LOG.info("LeaseManager {} initialized. Lease period {}ms", toString(), leasePeriodInMs);
-
- }
-
- // ----------------------------------------------------------------------------------------------------------------
- // LeaseManagement implementation
- // ----------------------------------------------------------------------------------------------------------------
-
- @Override
- public void startService() throws LeaseManagementException {
- createLeaseManagementZNode();
- createCurrentTSOZNode();
- startAndWait();
- }
-
- @Override
- public void stopService() throws LeaseManagementException {
- stopAndWait();
- }
-
- @Override
- public boolean stillInLeasePeriod() {
- return System.currentTimeMillis() <= getEndLeaseInMs();
- }
-
- // ----------------------------------------------------------------------------------------------------------------
- // End LeaseManagement implementation
- // ----------------------------------------------------------------------------------------------------------------
-
- void tryToGetInitialLeasePeriod() throws Exception {
- baseTimeInMs.set(System.currentTimeMillis());
- if (canAcquireLease()) {
- endLeaseInMs.set(baseTimeInMs.get() + leasePeriodInMs);
- LOG.info("{} got the lease (Master) Ver. {}/End of lease: {}ms", tsoHostAndPort,
- leaseNodeVersion, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(endLeaseInMs));
- tsoStateInitializer.submit(new Runnable() {
- // TSO State initialization
- @Override
- public void run() {
- try {
- TSOState newTSOState = stateManager.reset();
- advertiseTSOServerInfoThroughZK(newTSOState.getEpoch());
- tsoChannelHandler.reconnect();
- } catch (Exception e) {
- Thread t = Thread.currentThread();
- t.getUncaughtExceptionHandler().uncaughtException(t, e);
- }
- }
- });
- } else {
- tsoStateInitializer.submit(new Runnable() {
- // TSO State initialization
- @Override
- public void run() {
- // In case the TSO was paused close the connection
- tsoChannelHandler.closeConnection();
- }
- });
- }
- }
-
- void tryToRenewLeasePeriod() throws Exception {
- baseTimeInMs.set(System.currentTimeMillis());
- if (canAcquireLease()) {
- if (System.currentTimeMillis() > getEndLeaseInMs()) {
- endLeaseInMs.set(0L);
- LOG.warn("{} expired lease! Releasing lease to start Master re-election", tsoHostAndPort);
- tsoChannelHandler.closeConnection();
- } else {
- endLeaseInMs.set(baseTimeInMs.get() + leasePeriodInMs);
- LOG.trace("{} renewed lease: Version {}/End of lease at {}ms",
- tsoHostAndPort, leaseNodeVersion, endLeaseInMs);
- }
- } else {
- endLeaseInMs.set(0L);
- LOG.warn("{} lost the lease (Ver. {})! Other instance is now Master",
- tsoHostAndPort, leaseNodeVersion);
- tsoChannelHandler.closeConnection();
- }
- }
-
- private boolean haveLease() {
- return stillInLeasePeriod();
- }
-
- private long getEndLeaseInMs() {
- return endLeaseInMs.get();
- }
-
- private boolean canAcquireLease() throws Exception {
- try {
- int previousLeaseNodeVersion = leaseNodeVersion;
- final byte[] instanceInfo = tsoHostAndPort.getBytes(Charsets.UTF_8);
- // Try to acquire the lease
- Stat stat = zkClient.setData().withVersion(previousLeaseNodeVersion)
- .forPath(leasePath, instanceInfo);
- leaseNodeVersion = stat.getVersion();
- LOG.trace("{} got new lease version {}", tsoHostAndPort, leaseNodeVersion);
- } catch (KeeperException.BadVersionException e) {
- return false;
- }
- return true;
- }
-
- // ----------------------------------------------------------------------------------------------------------------
- // AbstractScheduledService implementation
- // ----------------------------------------------------------------------------------------------------------------
-
- @Override
- protected void startUp() {
- }
-
- @Override
- protected void shutDown() {
- try {
- tsoChannelHandler.close();
- LOG.info("Channel handler closed");
- } catch (IOException e) {
- LOG.error("Error closing TSOChannelHandler", e);
- }
- }
-
- @Override
- protected void runOneIteration() throws Exception {
-
- if (!haveLease()) {
- tryToGetInitialLeasePeriod();
- } else {
- tryToRenewLeasePeriod();
- }
-
- }
-
- @Override
- protected Scheduler scheduler() {
-
- final long guardLeasePeriodInMs = leasePeriodInMs / 4;
-
- return new AbstractScheduledService.CustomScheduler() {
-
- @Override
- protected Schedule getNextSchedule() throws Exception {
- if (!haveLease()) {
- // Get the current node version...
- Stat stat = zkClient.checkExists().forPath(leasePath);
- leaseNodeVersion = stat.getVersion();
- LOG.trace("{} will try to get lease (with Ver. {}) in {}ms", tsoHostAndPort, leaseNodeVersion,
- leasePeriodInMs);
- // ...and wait the lease period
- return new Schedule(leasePeriodInMs, TimeUnit.MILLISECONDS);
- } else {
- long waitTimeInMs = getEndLeaseInMs() - System.currentTimeMillis() - guardLeasePeriodInMs;
- LOG.trace("{} will try to renew lease (with Ver. {}) in {}ms", tsoHostAndPort,
- leaseNodeVersion, waitTimeInMs);
- return new Schedule(waitTimeInMs, TimeUnit.MILLISECONDS);
- }
- }
- };
-
- }
-
- // ----------------------------------------------------------------------------------------------------------------
- // Helper methods
- // ----------------------------------------------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return tsoHostAndPort;
- }
-
- private void createLeaseManagementZNode() throws LeaseManagementException {
- try {
- validateZKPath(leasePath);
- } catch (Exception e) {
- throw new LeaseManagementException("Error creating Lease Management ZNode", e);
- }
- }
-
- private void createCurrentTSOZNode() throws LeaseManagementException {
- try {
- validateZKPath(currentTSOPath);
- } catch (Exception e) {
- throw new LeaseManagementException("Error creating TSO ZNode", e);
- }
- }
-
- private void validateZKPath(String zkPath) throws Exception {
- EnsurePath path = zkClient.newNamespaceAwareEnsurePath(zkPath);
- path.ensure(zkClient.getZookeeperClient());
- Stat stat = zkClient.checkExists().forPath(zkPath);
- Preconditions.checkNotNull(stat);
- LOG.info("Path {} ensured", path.getPath());
- }
-
- private void advertiseTSOServerInfoThroughZK(long epoch) throws Exception {
-
- Stat previousTSOZNodeStat = new Stat();
- byte[] previousTSOInfoAsBytes = zkClient.getData().storingStatIn(previousTSOZNodeStat).forPath(currentTSOPath);
- if (previousTSOInfoAsBytes != null && !new String(previousTSOInfoAsBytes, Charsets.UTF_8).isEmpty()) {
- String previousTSOInfo = new String(previousTSOInfoAsBytes, Charsets.UTF_8);
- String[] previousTSOAndEpochArray = previousTSOInfo.split("#");
- Preconditions.checkArgument(previousTSOAndEpochArray.length == 2, "Incorrect TSO Info found: ", previousTSOInfo);
- long oldEpoch = Long.parseLong(previousTSOAndEpochArray[1]);
- if (oldEpoch > epoch) {
- throw new LeaseManagementException("Another TSO replica was found " + previousTSOInfo);
- }
- }
- String tsoInfoAsString = tsoHostAndPort + "#" + Long.toString(epoch);
- byte[] tsoInfoAsBytes = tsoInfoAsString.getBytes(Charsets.UTF_8);
- zkClient.setData().withVersion(previousTSOZNodeStat.getVersion()).forPath(currentTSOPath, tsoInfoAsBytes);
- LOG.info("TSO instance {} (Epoch {}) advertised through ZK", tsoHostAndPort, epoch);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/LongCache.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/LongCache.java b/tso-server/src/main/java/com/yahoo/omid/tso/LongCache.java
deleted file mode 100644
index e140d50..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/LongCache.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import java.util.Arrays;
-
-public class LongCache implements Cache {
-
- public static final long RESET_VALUE = 0L;
-
- private final long[] cache;
- private final int size;
- private final int associativity;
-
- public LongCache(int size, int associativity) {
- this.size = size;
- this.cache = new long[2 * (size + associativity)];
- this.associativity = associativity;
- }
-
- @Override
- public void reset() {
- Arrays.fill(cache, RESET_VALUE);
- }
-
- /* (non-Javadoc)
- * @see com.yahoo.omid.tso.Cache#set(long, long)
- */
- @Override
- public long set(long key, long value) {
- final int index = index(key);
- int oldestIndex = 0;
- long oldestValue = Long.MAX_VALUE;
- for (int i = 0; i < associativity; ++i) {
- int currIndex = 2 * (index + i);
- if (cache[currIndex] == key) {
- oldestValue = 0;
- oldestIndex = currIndex;
- break;
- }
- if (cache[currIndex + 1] <= oldestValue) {
- oldestValue = cache[currIndex + 1];
- oldestIndex = currIndex;
- }
- }
- cache[oldestIndex] = key;
- cache[oldestIndex + 1] = value;
- return oldestValue;
- }
-
- /* (non-Javadoc)
- * @see com.yahoo.omid.tso.Cache#get(long)
- */
- @Override
- public long get(long key) {
- final int index = index(key);
- for (int i = 0; i < associativity; ++i) {
- int currIndex = 2 * (index + i);
- if (cache[currIndex] == key) {
- return cache[currIndex + 1];
- }
- }
- return 0;
- }
-
- private int index(long hash) {
- return (int) (Math.abs(hash) % size);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/MockPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/MockPanicker.java b/tso-server/src/main/java/com/yahoo/omid/tso/MockPanicker.java
deleted file mode 100644
index aec6141..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/MockPanicker.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MockPanicker implements Panicker {
- private static final Logger LOG = LoggerFactory.getLogger(MockPanicker.class);
-
- @Override
- public void panic(String reason, Throwable cause) {
- LOG.error("PANICKING: {}", reason, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/MonitoringContext.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/MonitoringContext.java b/tso-server/src/main/java/com/yahoo/omid/tso/MonitoringContext.java
deleted file mode 100644
index c52e96a..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/MonitoringContext.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.base.Throwables;
-import com.yahoo.omid.metrics.MetricsRegistry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.concurrent.NotThreadSafe;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import static com.yahoo.omid.metrics.MetricsUtils.name;
-
-@NotThreadSafe
-public class MonitoringContext {
-
- private static final Logger LOG = LoggerFactory.getLogger(MonitoringContext.class);
-
- private volatile boolean flag;
- private Map<String, Long> elapsedTimeMsMap = new HashMap<>();
- private Map<String, Stopwatch> timers = new ConcurrentHashMap<>();
- private MetricsRegistry metrics;
-
- public MonitoringContext(MetricsRegistry metrics) {
- this.metrics = metrics;
- }
-
- public void timerStart(String name) {
- Stopwatch stopwatch = new Stopwatch();
- stopwatch.start();
- timers.put(name, stopwatch);
- }
-
- public void timerStop(String name) {
- if (flag) {
- LOG.warn("timerStop({}) called after publish. Measurement was ignored. {}", name, Throwables.getStackTraceAsString(new Exception()));
- return;
- }
- Stopwatch activeStopwatch = timers.get(name);
- if (activeStopwatch == null) {
- throw new IllegalStateException(
- String.format("There is no %s timer in the %s monitoring context.", name, this));
- }
- activeStopwatch.stop();
- elapsedTimeMsMap.put(name, activeStopwatch.elapsedTime(TimeUnit.NANOSECONDS));
- timers.remove(name);
- }
-
- public void publish() {
- flag = true;
- for (String name : elapsedTimeMsMap.keySet()) {
- Long durationInNs = elapsedTimeMsMap.get(name);
- metrics.timer(name("tso", name)).update(durationInNs);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/NetworkInterfaceUtils.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/NetworkInterfaceUtils.java b/tso-server/src/main/java/com/yahoo/omid/tso/NetworkInterfaceUtils.java
deleted file mode 100644
index 7a32f1e..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/NetworkInterfaceUtils.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.google.common.net.HostAndPort;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.net.UnknownHostException;
-import java.util.Enumeration;
-
-final public class NetworkInterfaceUtils {
-
- private static final Logger LOG = LoggerFactory.getLogger(NetworkInterfaceUtils.class);
-
- /**
- * Returns an <code>InetAddress</code> object encapsulating what is most
- * likely the machine's LAN IP address.
- * <p/>
- * This method is intended for use as a replacement of JDK method
- * <code>InetAddress.getLocalHost</code>, because that method is ambiguous
- * on Linux systems. Linux systems enumerate the loopback network
- * interface the same way as regular LAN network interfaces, but the JDK
- * <code>InetAddress.getLocalHost</code> method does not specify the
- * algorithm used to select the address returned under such circumstances,
- * and will often return the loopback address, which is not valid for
- * network communication. Details
- * <a href="http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4665037">here</a>.
- * <p/>
- * This method will scan all IP addresses on a particular network interface
- * specified as parameter on the host machine to determine the IP address
- * most likely to be the machine's LAN address. If the machine has multiple
- * IP addresses, this method will prefer a site-local IP address (e.g.
- * 192.168.x.x or 10.10.x.x, usually IPv4) if the machine has one (and will
- * return the first site-local address if the machine has more than one),
- * but if the machine does not hold a site-local address, this method will
- * return simply the first non-loopback address found (IPv4 or IPv6).
- * <p/>
- * If this method cannot find a non-loopback address using this selection
- * algorithm, it will fall back to calling and returning the result of JDK
- * method <code>InetAddress.getLocalHost()</code>.
- * <p/>
- *
- * @param ifaceName
- * The name of the network interface to extract the IP address
- * from
- * @throws UnknownHostException
- * If the LAN address of the machine cannot be found.
- */
- static InetAddress getIPAddressFromNetworkInterface(String ifaceName)
- throws SocketException, UnknownHostException {
-
- NetworkInterface iface = NetworkInterface.getByName(ifaceName);
- if (iface == null) {
- throw new IllegalArgumentException(
- "Network interface " + ifaceName + " not found");
- }
-
- InetAddress candidateAddress = null;
- Enumeration<InetAddress> inetAddrs = iface.getInetAddresses();
- while (inetAddrs.hasMoreElements()) {
- InetAddress inetAddr = inetAddrs.nextElement();
- if (!inetAddr.isLoopbackAddress()) {
- if (inetAddr.isSiteLocalAddress()) {
- return inetAddr; // Return non-loopback site-local address
- } else if (candidateAddress == null) {
- // Found non-loopback address, but not necessarily site-local
- candidateAddress = inetAddr;
- }
- }
- }
-
- if (candidateAddress != null) {
- // Site-local address not found, but found other non-loopback addr
- // Server might have a non-site-local address assigned to its NIC
- // (or might be running IPv6 which deprecates "site-local" concept)
- return candidateAddress;
- }
-
- // At this point, we did not find a non-loopback address.
- // Fall back to returning whatever InetAddress.getLocalHost() returns
- InetAddress jdkSuppliedAddress = InetAddress.getLocalHost();
- if (jdkSuppliedAddress == null) {
- throw new UnknownHostException(
- "InetAddress.getLocalHost() unexpectedly returned null.");
- }
- return jdkSuppliedAddress;
- }
-
- public static String getTSOHostAndPort(TSOServerConfig config) throws SocketException, UnknownHostException {
-
- // Build TSO host:port string and validate it
- final String tsoNetIfaceName = config.getNetworkIfaceName();
- InetAddress addr = getIPAddressFromNetworkInterface(tsoNetIfaceName);
- final int tsoPort = config.getPort();
-
- String tsoHostAndPortAsString = "N/A";
- try {
- tsoHostAndPortAsString = HostAndPort.fromParts(addr.getHostAddress(), tsoPort).toString();
- } catch (IllegalArgumentException e) {
- LOG.error("Cannot parse TSO host:port string {}", tsoHostAndPortAsString);
- throw e;
- }
- return tsoHostAndPortAsString;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/Panicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/Panicker.java b/tso-server/src/main/java/com/yahoo/omid/tso/Panicker.java
deleted file mode 100644
index 47860ae..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/Panicker.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-public interface Panicker {
- public void panic(String reason, Throwable cause);
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/PausableTimestampOracle.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/PausableTimestampOracle.java b/tso-server/src/main/java/com/yahoo/omid/tso/PausableTimestampOracle.java
deleted file mode 100644
index 98be866..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/PausableTimestampOracle.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.yahoo.omid.metrics.MetricsRegistry;
-import com.yahoo.omid.timestamp.storage.TimestampStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import java.io.IOException;
-
-public class PausableTimestampOracle extends TimestampOracleImpl {
-
- private static final Logger LOG = LoggerFactory.getLogger(PausableTimestampOracle.class);
-
- private volatile boolean tsoPaused = false;
-
- @Inject
- public PausableTimestampOracle(MetricsRegistry metrics,
- TimestampStorage tsStorage,
- Panicker panicker) throws IOException {
- super(metrics, tsStorage, panicker);
- }
-
- @Override
- public long next() throws IOException {
- while (tsoPaused) {
- synchronized (this) {
- try {
- this.wait();
- } catch (InterruptedException e) {
- LOG.error("Interrupted whilst paused");
- Thread.currentThread().interrupt();
- }
- }
- }
- return super.next();
- }
-
- public synchronized void pause() {
- tsoPaused = true;
- this.notifyAll();
- }
-
- public synchronized void resume() {
- tsoPaused = false;
- this.notifyAll();
- }
-
- public boolean isTSOPaused() {
- return tsoPaused;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessor.java
deleted file mode 100644
index 4aa7b51..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessor.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import org.jboss.netty.channel.Channel;
-
-interface PersistenceProcessor {
- void persistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx);
-
- void persistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx);
-
- void persistTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx);
-
- void persistLowWatermark(long lowWatermark);
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessorImpl.java
deleted file mode 100644
index 2b46b86..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/PersistenceProcessorImpl.java
+++ /dev/null
@@ -1,431 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.BatchEventProcessor;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
-import com.lmax.disruptor.TimeoutHandler;
-import com.yahoo.omid.committable.CommitTable;
-import com.yahoo.omid.metrics.Histogram;
-import com.yahoo.omid.metrics.Meter;
-import com.yahoo.omid.metrics.MetricsRegistry;
-import com.yahoo.omid.metrics.Timer;
-import org.jboss.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import javax.inject.Named;
-import java.io.IOException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static com.yahoo.omid.metrics.MetricsUtils.name;
-import static com.yahoo.omid.tso.TSOServer.TSO_HOST_AND_PORT_KEY;
-
-class PersistenceProcessorImpl
- implements EventHandler<PersistenceProcessorImpl.PersistEvent>, PersistenceProcessor, TimeoutHandler {
-
- private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessor.class);
-
- private final String tsoHostAndPort;
- private final LeaseManagement leaseManager;
- final ReplyProcessor reply;
- final RetryProcessor retryProc;
- final CommitTable.Client commitTableClient;
- final CommitTable.Writer writer;
- final Panicker panicker;
- final RingBuffer<PersistEvent> persistRing;
- final Batch batch;
- final Timer flushTimer;
- final Histogram batchSizeHistogram;
- final Meter timeoutMeter;
- final int batchPersistTimeoutInMs;
-
- long lastFlush = System.nanoTime();
-
- @Inject
- PersistenceProcessorImpl(TSOServerConfig config,
- MetricsRegistry metrics,
- @Named(TSO_HOST_AND_PORT_KEY) String tsoHostAndPort,
- LeaseManagement leaseManager,
- CommitTable commitTable,
- ReplyProcessor reply,
- RetryProcessor retryProc,
- Panicker panicker)
- throws IOException {
-
- this(config,
- metrics,
- tsoHostAndPort,
- new Batch(config.getMaxBatchSize()),
- leaseManager,
- commitTable,
- reply,
- retryProc,
- panicker);
-
- }
-
- @VisibleForTesting
- PersistenceProcessorImpl(TSOServerConfig config,
- MetricsRegistry metrics,
- String tsoHostAndPort,
- Batch batch,
- LeaseManagement leaseManager,
- CommitTable commitTable,
- ReplyProcessor reply,
- RetryProcessor retryProc,
- Panicker panicker)
- throws IOException {
-
- this.tsoHostAndPort = tsoHostAndPort;
- this.batch = batch;
- this.batchPersistTimeoutInMs = config.getBatchPersistTimeoutInMs();
- this.leaseManager = leaseManager;
- this.commitTableClient = commitTable.getClient();
- this.writer = commitTable.getWriter();
- this.reply = reply;
- this.retryProc = retryProc;
- this.panicker = panicker;
-
- LOG.info("Creating the persist processor with batch size {}, and timeout {}ms",
- config.getMaxBatchSize(), batchPersistTimeoutInMs);
-
- flushTimer = metrics.timer(name("tso", "persist", "flush"));
- batchSizeHistogram = metrics.histogram(name("tso", "persist", "batchsize"));
- timeoutMeter = metrics.meter(name("tso", "persist", "timeout"));
-
- // FIXME consider putting something more like a phased strategy here to avoid
- // all the syscalls
- final TimeoutBlockingWaitStrategy timeoutStrategy
- = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), TimeUnit.MILLISECONDS);
-
- persistRing = RingBuffer.createSingleProducer(
- PersistEvent.EVENT_FACTORY, 1 << 20, timeoutStrategy); // 2^20 entries in ringbuffer
- SequenceBarrier persistSequenceBarrier = persistRing.newBarrier();
- BatchEventProcessor<PersistEvent> persistProcessor = new BatchEventProcessor<>(
- persistRing,
- persistSequenceBarrier,
- this);
- persistRing.addGatingSequences(persistProcessor.getSequence());
- persistProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
-
- ExecutorService persistExec = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setNameFormat("persist-%d").build());
- persistExec.submit(persistProcessor);
-
- }
-
- @Override
- public void onEvent(PersistEvent event, long sequence, boolean endOfBatch) throws Exception {
-
- switch (event.getType()) {
- case COMMIT:
- event.getMonCtx().timerStart("commitPersistProcessor");
- // TODO: What happens when the IOException is thrown?
- writer.addCommittedTransaction(event.getStartTimestamp(), event.getCommitTimestamp());
- batch.addCommit(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel(),
- event.getMonCtx());
- break;
- case ABORT:
- sendAbortOrIdentifyFalsePositive(event.getStartTimestamp(), event.isRetry(), event.getChannel(),
- event.getMonCtx());
- break;
- case TIMESTAMP:
- event.getMonCtx().timerStart("timestampPersistProcessor");
- batch.addTimestamp(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
- break;
- }
- if (batch.isFull() || endOfBatch) {
- maybeFlushBatch();
- }
-
- }
-
- private void sendAbortOrIdentifyFalsePositive(long startTimestamp, boolean isRetry, Channel channel,
- MonitoringContext monCtx) {
-
- if (!isRetry) {
- reply.abortResponse(startTimestamp, channel, monCtx);
- return;
- }
-
- // If is a retry, we must check if it is a already committed request abort.
- // This can happen because a client could have missed the reply, so it
- // retried the request after a timeout. So we added to the batch and when
- // it's flushed we'll add events to the retry processor in order to check
- // for false positive aborts. It needs to be done after the flush in case
- // the commit has occurred but it hasn't been persisted yet.
- batch.addUndecidedRetriedRequest(startTimestamp, channel, monCtx);
- }
-
- // no event has been received in the timeout period
- @Override
- public void onTimeout(final long sequence) {
- maybeFlushBatch();
- }
-
- /**
- * Flush the current batch if it's full, or the timeout has been elapsed since the last flush.
- */
- private void maybeFlushBatch() {
- if (batch.isFull()) {
- flush();
- } else if ((System.nanoTime() - lastFlush) > TimeUnit.MILLISECONDS.toNanos(batchPersistTimeoutInMs)) {
- timeoutMeter.mark();
- flush();
- }
- }
-
- @VisibleForTesting
- synchronized void flush() {
- lastFlush = System.nanoTime();
-
- boolean areWeStillMaster = true;
- if (!leaseManager.stillInLeasePeriod()) {
- // The master TSO replica has changed, so we must inform the
- // clients about it when sending the replies and avoid flushing
- // the current batch of TXs
- areWeStillMaster = false;
- // We need also to clear the data in the buffer
- writer.clearWriteBuffer();
- LOG.trace("Replica {} lost mastership before flushing data", tsoHostAndPort);
- } else {
- try {
- writer.flush();
- } catch (IOException e) {
- panicker.panic("Error persisting commit batch", e.getCause());
- }
- batchSizeHistogram.update(batch.getNumEvents());
- if (!leaseManager.stillInLeasePeriod()) {
- // If after flushing this TSO server is not the master
- // replica we need inform the client about it
- areWeStillMaster = false;
- LOG.warn("Replica {} lost mastership after flushing data", tsoHostAndPort);
- }
- }
- flushTimer.update((System.nanoTime() - lastFlush));
- batch.sendRepliesAndReset(reply, retryProc, areWeStillMaster);
-
- }
-
- @Override
- public void persistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
- long seq = persistRing.next();
- PersistEvent e = persistRing.get(seq);
- PersistEvent.makePersistCommit(e, startTimestamp, commitTimestamp, c, monCtx);
- persistRing.publish(seq);
- }
-
- @Override
- public void persistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx) {
- long seq = persistRing.next();
- PersistEvent e = persistRing.get(seq);
- PersistEvent.makePersistAbort(e, startTimestamp, isRetry, c, monCtx);
- persistRing.publish(seq);
- }
-
- @Override
- public void persistTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) {
- long seq = persistRing.next();
- PersistEvent e = persistRing.get(seq);
- PersistEvent.makePersistTimestamp(e, startTimestamp, c, monCtx);
- persistRing.publish(seq);
- }
-
- @Override
- public void persistLowWatermark(long lowWatermark) {
- try {
- writer.updateLowWatermark(lowWatermark);
- } catch (IOException e) {
- LOG.error("Should not be thrown");
- }
- }
-
- public static class Batch {
-
- final PersistEvent[] events;
- final int maxBatchSize;
- int numEvents;
-
- Batch(int maxBatchSize) {
- assert (maxBatchSize > 0);
- this.maxBatchSize = maxBatchSize;
- events = new PersistEvent[maxBatchSize];
- numEvents = 0;
- for (int i = 0; i < maxBatchSize; i++) {
- events[i] = new PersistEvent();
- }
- }
-
- boolean isFull() {
- assert (numEvents <= maxBatchSize);
- return numEvents == maxBatchSize;
- }
-
- int getNumEvents() {
- return numEvents;
- }
-
- void addCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
- if (isFull()) {
- throw new IllegalStateException("batch full");
- }
- int index = numEvents++;
- PersistEvent e = events[index];
- PersistEvent.makePersistCommit(e, startTimestamp, commitTimestamp, c, monCtx);
- }
-
- void addUndecidedRetriedRequest(long startTimestamp, Channel c, MonitoringContext monCtx) {
- if (isFull()) {
- throw new IllegalStateException("batch full");
- }
- int index = numEvents++;
- PersistEvent e = events[index];
- // We mark the event as an ABORT retry to identify the events to send
- // to the retry processor
- PersistEvent.makePersistAbort(e, startTimestamp, true, c, monCtx);
- }
-
- void addTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) {
- if (isFull()) {
- throw new IllegalStateException("batch full");
- }
- int index = numEvents++;
- PersistEvent e = events[index];
- PersistEvent.makePersistTimestamp(e, startTimestamp, c, monCtx);
- }
-
- void sendRepliesAndReset(ReplyProcessor reply, RetryProcessor retryProc, boolean isTSOInstanceMaster) {
- for (int i = 0; i < numEvents; i++) {
- PersistEvent e = events[i];
- switch (e.getType()) {
- case TIMESTAMP:
- e.getMonCtx().timerStop("timestampPersistProcessor");
- reply.timestampResponse(e.getStartTimestamp(), e.getChannel(), e.getMonCtx());
- break;
- case COMMIT:
- e.getMonCtx().timerStop("commitPersistProcessor");
- if (isTSOInstanceMaster) {
- reply.commitResponse(false, e.getStartTimestamp(), e.getCommitTimestamp(), e.getChannel(),
- e.getMonCtx());
- } else {
- // The client will need to perform heuristic actions to determine the output
- reply.commitResponse(true, e.getStartTimestamp(), e.getCommitTimestamp(), e.getChannel(),
- e.getMonCtx());
- }
- break;
- case ABORT:
- if (e.isRetry()) {
- retryProc.disambiguateRetryRequestHeuristically(e.getStartTimestamp(), e.getChannel(),
- e.getMonCtx());
- } else {
- LOG.error("We should not be receiving non-retried aborted requests in here");
- }
- break;
- default:
- LOG.error("We should receive only COMMIT or ABORT event types. Received {}", e.getType());
- break;
- }
- }
- numEvents = 0;
- }
-
- }
-
- public final static class PersistEvent {
-
- private MonitoringContext monCtx;
-
- enum Type {
- TIMESTAMP, COMMIT, ABORT
- }
-
- private Type type = null;
- private Channel channel = null;
-
- private boolean isRetry = false;
- private long startTimestamp = 0;
- private long commitTimestamp = 0;
-
- static void makePersistCommit(PersistEvent e, long startTimestamp, long commitTimestamp, Channel c,
- MonitoringContext monCtx) {
- e.type = Type.COMMIT;
- e.startTimestamp = startTimestamp;
- e.commitTimestamp = commitTimestamp;
- e.channel = c;
- e.monCtx = monCtx;
- }
-
- static void makePersistAbort(PersistEvent e, long startTimestamp, boolean isRetry, Channel c,
- MonitoringContext monCtx) {
- e.type = Type.ABORT;
- e.startTimestamp = startTimestamp;
- e.isRetry = isRetry;
- e.channel = c;
- e.monCtx = monCtx;
- }
-
- static void makePersistTimestamp(PersistEvent e, long startTimestamp, Channel c, MonitoringContext monCtx) {
- e.type = Type.TIMESTAMP;
- e.startTimestamp = startTimestamp;
- e.channel = c;
- e.monCtx = monCtx;
- }
-
- MonitoringContext getMonCtx() {
- return monCtx;
- }
-
- Type getType() {
- return type;
- }
-
- Channel getChannel() {
- return channel;
- }
-
- boolean isRetry() {
- return isRetry;
- }
-
- long getStartTimestamp() {
- return startTimestamp;
- }
-
- long getCommitTimestamp() {
- return commitTimestamp;
- }
-
- public final static EventFactory<PersistEvent> EVENT_FACTORY = new EventFactory<PersistEvent>() {
- @Override
- public PersistEvent newInstance() {
- return new PersistEvent();
- }
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessor.java
deleted file mode 100644
index f0788ed..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessor.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import org.jboss.netty.channel.Channel;
-
-interface ReplyProcessor {
- /**
- * Informs the client about the outcome of the Tx it was trying to
- * commit. If the heuristic decision flat is enabled, the client
- * will need to do additional actions for learning the final outcome.
- *
- * @param makeHeuristicDecision
- * informs about whether heuristic actions are needed or not
- * @param startTimestamp
- * the start timestamp of the transaction (a.k.a. tx id)
- * @param commitTimestamp
- * the commit timestamp of the transaction
- * @param channel
- * the communication channed with the client
- */
- void commitResponse(boolean makeHeuristicDecision, long startTimestamp, long commitTimestamp, Channel channel, MonitoringContext monCtx);
-
- void abortResponse(long startTimestamp, Channel c, MonitoringContext monCtx);
-
- void timestampResponse(long startTimestamp, Channel c, MonitoringContext monCtx);
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessorImpl.java
deleted file mode 100644
index a69700d..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/ReplyProcessorImpl.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.BatchEventProcessor;
-import com.lmax.disruptor.BusySpinWaitStrategy;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SequenceBarrier;
-import com.yahoo.omid.metrics.Meter;
-import com.yahoo.omid.metrics.MetricsRegistry;
-import com.yahoo.omid.proto.TSOProto;
-import org.jboss.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static com.yahoo.omid.metrics.MetricsUtils.name;
-
-class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyEvent>, ReplyProcessor {
-
- private static final Logger LOG = LoggerFactory.getLogger(ReplyProcessorImpl.class);
-
- final RingBuffer<ReplyEvent> replyRing;
- final Meter abortMeter;
- final Meter commitMeter;
- final Meter timestampMeter;
-
- @Inject
- ReplyProcessorImpl(MetricsRegistry metrics, Panicker panicker) {
- replyRing = RingBuffer.<ReplyEvent>createMultiProducer(ReplyEvent.EVENT_FACTORY, 1 << 12,
- new BusySpinWaitStrategy());
- SequenceBarrier replySequenceBarrier = replyRing.newBarrier();
- BatchEventProcessor<ReplyEvent> replyProcessor = new BatchEventProcessor<ReplyEvent>(
- replyRing, replySequenceBarrier, this);
- replyProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
-
- replyRing.addGatingSequences(replyProcessor.getSequence());
-
- ExecutorService replyExec = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setNameFormat("reply-%d").build());
- replyExec.submit(replyProcessor);
-
- abortMeter = metrics.meter(name("tso", "aborts"));
- commitMeter = metrics.meter(name("tso", "commits"));
- timestampMeter = metrics.meter(name("tso", "timestampAllocation"));
- }
-
- public void onEvent(ReplyEvent event, long sequence, boolean endOfBatch) throws Exception {
- String name = null;
- try {
- switch (event.getType()) {
- case COMMIT:
- name = "commitReplyProcessor";
- event.getMonCtx().timerStart(name);
- handleCommitResponse(false, event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel());
- break;
- case HEURISTIC_COMMIT:
- name = "commitReplyProcessor";
- event.getMonCtx().timerStart(name);
- handleCommitResponse(true, event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel());
- break;
- case ABORT:
- name = "abortReplyProcessor";
- event.getMonCtx().timerStart(name);
- handleAbortResponse(event.getStartTimestamp(), event.getChannel());
- break;
- case TIMESTAMP:
- name = "timestampReplyProcessor";
- event.getMonCtx().timerStart(name);
- handleTimestampResponse(event.getStartTimestamp(), event.getChannel());
- break;
- default:
- LOG.error("Unknown event {}", event.getType());
- break;
- }
- } finally {
- if (name != null) {
- event.getMonCtx().timerStop(name);
- }
- }
- event.getMonCtx().publish();
- }
-
- @Override
- public void commitResponse(boolean makeHeuristicDecision, long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
- long seq = replyRing.next();
- ReplyEvent e = replyRing.get(seq);
- ReplyEvent.makeCommitResponse(makeHeuristicDecision, e, startTimestamp, commitTimestamp, c, monCtx);
- replyRing.publish(seq);
- }
-
- @Override
- public void abortResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
- long seq = replyRing.next();
- ReplyEvent e = replyRing.get(seq);
- ReplyEvent.makeAbortResponse(e, startTimestamp, c, monCtx);
- replyRing.publish(seq);
- }
-
- @Override
- public void timestampResponse(long startTimestamp, Channel c, MonitoringContext monCtx) {
- long seq = replyRing.next();
- ReplyEvent e = replyRing.get(seq);
- ReplyEvent.makeTimestampReponse(e, startTimestamp, c, monCtx);
- replyRing.publish(seq);
- }
-
- void handleCommitResponse(boolean makeHeuristicDecision, long startTimestamp, long commitTimestamp, Channel c) {
- TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
- TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
- if (makeHeuristicDecision) { // If the commit is ambiguous is due to a new master TSO
- commitBuilder.setMakeHeuristicDecision(true);
- }
- commitBuilder.setAborted(false)
- .setStartTimestamp(startTimestamp)
- .setCommitTimestamp(commitTimestamp);
- builder.setCommitResponse(commitBuilder.build());
- c.write(builder.build());
-
- commitMeter.mark();
- }
-
- void handleAbortResponse(long startTimestamp, Channel c) {
- TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
- TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
- commitBuilder.setAborted(true)
- .setStartTimestamp(startTimestamp);
- builder.setCommitResponse(commitBuilder.build());
- c.write(builder.build());
-
- abortMeter.mark();
- }
-
- void handleTimestampResponse(long startTimestamp, Channel c) {
- TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
- TSOProto.TimestampResponse.Builder respBuilder = TSOProto.TimestampResponse.newBuilder();
- respBuilder.setStartTimestamp(startTimestamp);
- builder.setTimestampResponse(respBuilder.build());
- c.write(builder.build());
-
- timestampMeter.mark();
- }
-
- public final static class ReplyEvent {
-
- enum Type {
- TIMESTAMP, COMMIT, HEURISTIC_COMMIT, ABORT
- }
-
- private Type type = null;
- private Channel channel = null;
-
- private long startTimestamp = 0;
- private long commitTimestamp = 0;
- private MonitoringContext monCtx;
-
- Type getType() {
- return type;
- }
-
- Channel getChannel() {
- return channel;
- }
-
- long getStartTimestamp() {
- return startTimestamp;
- }
-
- long getCommitTimestamp() {
- return commitTimestamp;
- }
-
- MonitoringContext getMonCtx() {
- return monCtx;
- }
-
- static void makeTimestampReponse(ReplyEvent e, long startTimestamp, Channel c, MonitoringContext monCtx) {
- e.type = Type.TIMESTAMP;
- e.startTimestamp = startTimestamp;
- e.channel = c;
- e.monCtx = monCtx;
- }
-
- static void makeCommitResponse(boolean makeHeuristicDecision, ReplyEvent e, long startTimestamp,
- long commitTimestamp, Channel c, MonitoringContext monCtx) {
-
- if (makeHeuristicDecision) {
- e.type = Type.HEURISTIC_COMMIT;
- } else {
- e.type = Type.COMMIT;
- }
- e.startTimestamp = startTimestamp;
- e.commitTimestamp = commitTimestamp;
- e.channel = c;
- e.monCtx = monCtx;
- }
-
- static void makeAbortResponse(ReplyEvent e, long startTimestamp, Channel c, MonitoringContext monCtx) {
- e.type = Type.ABORT;
- e.startTimestamp = startTimestamp;
- e.channel = c;
- e.monCtx = monCtx;
- }
-
- public final static EventFactory<ReplyEvent> EVENT_FACTORY = new EventFactory<ReplyEvent>() {
- @Override
- public ReplyEvent newInstance() {
- return new ReplyEvent();
- }
- };
-
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessor.java b/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessor.java
deleted file mode 100644
index 2870cb7..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessor.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import org.jboss.netty.channel.Channel;
-
-import java.util.Collection;
-
-// NOTE: public is required explicitly in the interface definition for Guice injection
-public interface RequestProcessor extends TSOStateManager.StateObserver {
-
- void timestampRequest(Channel c, MonitoringContext monCtx);
-
- void commitRequest(long startTimestamp, Collection<Long> writeSet, boolean isRetry, Channel c, MonitoringContext monCtx);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessorImpl.java
deleted file mode 100644
index 6194b2e..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/RequestProcessorImpl.java
+++ /dev/null
@@ -1,309 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.BatchEventProcessor;
-import com.lmax.disruptor.BusySpinWaitStrategy;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SequenceBarrier;
-import com.yahoo.omid.metrics.MetricsRegistry;
-import com.yahoo.omid.tso.TSOStateManager.TSOState;
-import org.jboss.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-public class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor {
-
- private static final Logger LOG = LoggerFactory.getLogger(RequestProcessorImpl.class);
-
- private final TimestampOracle timestampOracle;
- public final CommitHashMap hashmap;
- private final MetricsRegistry metrics;
- private final PersistenceProcessor persistProc;
- private final RingBuffer<RequestEvent> requestRing;
- private long lowWatermark = -1L;
- private long epoch = -1L;
-
- @Inject
- RequestProcessorImpl(TSOServerConfig config,
- MetricsRegistry metrics,
- TimestampOracle timestampOracle,
- PersistenceProcessor persistProc,
- Panicker panicker) throws IOException {
-
- this.metrics = metrics;
-
- this.persistProc = persistProc;
- this.timestampOracle = timestampOracle;
-
- this.hashmap = new CommitHashMap(config.getMaxItems());
-
- // Set up the disruptor thread
- requestRing = RingBuffer.<RequestEvent>createMultiProducer(RequestEvent.EVENT_FACTORY, 1 << 12,
- new BusySpinWaitStrategy());
- SequenceBarrier requestSequenceBarrier = requestRing.newBarrier();
- BatchEventProcessor<RequestEvent> requestProcessor =
- new BatchEventProcessor<RequestEvent>(requestRing,
- requestSequenceBarrier,
- this);
- requestRing.addGatingSequences(requestProcessor.getSequence());
- requestProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
-
- ExecutorService requestExec = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setNameFormat("request-%d").build());
- // Each processor runs on a separate thread
- requestExec.submit(requestProcessor);
-
- }
-
- /**
- * This should be called when the TSO gets initialized or gets leadership
- */
- @Override
- public void update(TSOState state) throws IOException {
- LOG.info("Reseting RequestProcessor...");
- this.lowWatermark = state.getLowWatermark();
- persistProc.persistLowWatermark(lowWatermark);
- this.epoch = state.getEpoch();
- hashmap.reset();
- LOG.info("RequestProcessor initialized with LWMs {} and Epoch {}", lowWatermark, epoch);
- }
-
- @Override
- public void onEvent(RequestEvent event, long sequence, boolean endOfBatch) throws Exception {
- String name = null;
- try {
- if (event.getType() == RequestEvent.Type.TIMESTAMP) {
- name = "timestampReqProcessor";
- event.getMonCtx().timerStart(name);
- handleTimestamp(event);
- } else if (event.getType() == RequestEvent.Type.COMMIT) {
- name = "commitReqProcessor";
- event.getMonCtx().timerStart(name);
- handleCommit(event);
- }
- } finally {
- if (name != null) {
- event.getMonCtx().timerStop(name);
- }
- }
-
- }
-
- @Override
- public void timestampRequest(Channel c, MonitoringContext monCtx) {
- long seq = requestRing.next();
- RequestEvent e = requestRing.get(seq);
- RequestEvent.makeTimestampRequest(e, c, monCtx);
- requestRing.publish(seq);
- }
-
- @Override
- public void commitRequest(long startTimestamp, Collection<Long> writeSet, boolean isRetry, Channel c, MonitoringContext monCtx) {
- long seq = requestRing.next();
- RequestEvent e = requestRing.get(seq);
- RequestEvent.makeCommitRequest(e, startTimestamp, monCtx, writeSet, isRetry, c);
- requestRing.publish(seq);
- }
-
- public void handleTimestamp(RequestEvent requestEvent) {
- long timestamp;
-
- try {
- timestamp = timestampOracle.next();
- } catch (IOException e) {
- LOG.error("Error getting timestamp", e);
- return;
- }
-
- persistProc.persistTimestamp(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
- }
-
- public long handleCommit(RequestEvent event) {
- long startTimestamp = event.getStartTimestamp();
- Iterable<Long> writeSet = event.writeSet();
- boolean isRetry = event.isRetry();
- Channel c = event.getChannel();
-
- boolean committed = false;
- long commitTimestamp = 0L;
-
- int numCellsInWriteset = 0;
- // 0. check if it should abort
- if (startTimestamp <= lowWatermark) {
- committed = false;
- } else {
- // 1. check the write-write conflicts
- committed = true;
- for (long cellId : writeSet) {
- long value = hashmap.getLatestWriteForCell(cellId);
- if (value != 0 && value >= startTimestamp) {
- committed = false;
- break;
- }
- numCellsInWriteset++;
- }
- }
-
- if (committed) {
- // 2. commit
- try {
- commitTimestamp = timestampOracle.next();
-
- if (numCellsInWriteset > 0) {
- long newLowWatermark = lowWatermark;
-
- for (long r : writeSet) {
- long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
- newLowWatermark = Math.max(removed, newLowWatermark);
- }
-
- if (newLowWatermark != lowWatermark) {
- LOG.trace("Setting new low Watermark to {}", newLowWatermark);
- lowWatermark = newLowWatermark;
- persistProc.persistLowWatermark(newLowWatermark);
- }
- }
- persistProc.persistCommit(startTimestamp, commitTimestamp, c, event.getMonCtx());
- } catch (IOException e) {
- LOG.error("Error committing", e);
- }
- } else { // add it to the aborted list
- persistProc.persistAbort(startTimestamp, isRetry, c, event.getMonCtx());
- }
-
- return commitTimestamp;
- }
-
- final static class RequestEvent implements Iterable<Long> {
-
- enum Type {
- TIMESTAMP, COMMIT
- }
-
- ;
-
- private Type type = null;
- private Channel channel = null;
-
- private boolean isRetry = false;
- private long startTimestamp = 0;
- private MonitoringContext monCtx;
- private long numCells = 0;
-
- private static final int MAX_INLINE = 40;
- private Long writeSet[] = new Long[MAX_INLINE];
- private Collection<Long> writeSetAsCollection = null; // for the case where there's more than MAX_INLINE
-
- static void makeTimestampRequest(RequestEvent e, Channel c, MonitoringContext monCtx) {
- e.type = Type.TIMESTAMP;
- e.channel = c;
- e.monCtx = monCtx;
- }
-
- static void makeCommitRequest(RequestEvent e,
- long startTimestamp, MonitoringContext monCtx, Collection<Long> writeSet,
- boolean isRetry, Channel c) {
- e.monCtx = monCtx;
- e.type = Type.COMMIT;
- e.channel = c;
- e.startTimestamp = startTimestamp;
- e.isRetry = isRetry;
- if (writeSet.size() > MAX_INLINE) {
- e.numCells = writeSet.size();
- e.writeSetAsCollection = writeSet;
- } else {
- e.writeSetAsCollection = null;
- e.numCells = writeSet.size();
- int i = 0;
- for (Long cellId : writeSet) {
- e.writeSet[i] = cellId;
- i++;
- }
- }
- }
-
- MonitoringContext getMonCtx() {
- return monCtx;
- }
-
- Type getType() {
- return type;
- }
-
- long getStartTimestamp() {
- return startTimestamp;
- }
-
- Channel getChannel() {
- return channel;
- }
-
- @Override
- public Iterator<Long> iterator() {
- if (writeSetAsCollection != null) {
- return writeSetAsCollection.iterator();
- }
- return new Iterator<Long>() {
- int i = 0;
-
- @Override
- public boolean hasNext() {
- return i < numCells;
- }
-
- @Override
- public Long next() {
- return writeSet[i++];
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- };
- }
-
- Iterable<Long> writeSet() {
- return this;
- }
-
- boolean isRetry() {
- return isRetry;
- }
-
- public final static EventFactory<RequestEvent> EVENT_FACTORY
- = new EventFactory<RequestEvent>() {
- @Override
- public RequestEvent newInstance() {
- return new RequestEvent();
- }
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessor.java b/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessor.java
deleted file mode 100644
index 70309b9..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessor.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import org.jboss.netty.channel.Channel;
-
-interface RetryProcessor {
- void disambiguateRetryRequestHeuristically(long startTimestamp, Channel c, MonitoringContext monCtx);
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessorImpl.java
deleted file mode 100644
index 13fbe2d..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/RetryProcessorImpl.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.BatchEventProcessor;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.YieldingWaitStrategy;
-import com.yahoo.omid.committable.CommitTable;
-import com.yahoo.omid.committable.CommitTable.CommitTimestamp;
-import com.yahoo.omid.metrics.Meter;
-import com.yahoo.omid.metrics.MetricsRegistry;
-import org.jboss.netty.channel.Channel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static com.yahoo.omid.metrics.MetricsUtils.name;
-
-/**
- * Manages the retry requests that clients can send when they did not received the response in the specified timeout
- */
-class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>, RetryProcessor {
-
- private static final Logger LOG = LoggerFactory.getLogger(RetryProcessor.class);
-
- // Disruptor chain stuff
- final ReplyProcessor replyProc;
- final RingBuffer<RetryEvent> retryRing;
-
- final CommitTable.Client commitTableClient;
- final CommitTable.Writer writer;
-
- // Metrics
- final Meter retriesMeter;
-
- @Inject
- RetryProcessorImpl(MetricsRegistry metrics, CommitTable commitTable, ReplyProcessor replyProc, Panicker panicker)
- throws IOException
- {
-
- this.commitTableClient = commitTable.getClient();
- this.writer = commitTable.getWriter();
- this.replyProc = replyProc;
-
- WaitStrategy strategy = new YieldingWaitStrategy();
-
- retryRing = RingBuffer.createSingleProducer(RetryEvent.EVENT_FACTORY, 1 << 12, strategy);
- SequenceBarrier retrySeqBarrier = retryRing.newBarrier();
- BatchEventProcessor<RetryEvent> retryProcessor = new BatchEventProcessor<>(retryRing, retrySeqBarrier, this);
- retryProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
-
- retryRing.addGatingSequences(retryProcessor.getSequence());
-
- ExecutorService retryExec = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setNameFormat("retry-%d").build());
- retryExec.submit(retryProcessor);
-
- // Metrics
- retriesMeter = metrics.meter(name("tso", "retries"));
- }
-
- @Override
- public void onEvent(final RetryEvent event, final long sequence, final boolean endOfBatch)
- throws Exception {
-
- switch (event.getType()) {
- case COMMIT:
- // TODO: What happens when the IOException is thrown?
- handleCommitRetry(event);
- break;
- default:
- assert (false);
- break;
- }
-
- }
-
- private void handleCommitRetry(RetryEvent event) throws InterruptedException, ExecutionException {
-
- long startTimestamp = event.getStartTimestamp();
-
- try {
- Optional<CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(startTimestamp).get();
- if (commitTimestamp.isPresent()) {
- if (commitTimestamp.get().isValid()) {
- LOG.trace("Valid commit TS found in Commit Table");
- replyProc.commitResponse(false, startTimestamp, commitTimestamp.get().getValue(),
- event.getChannel(), event.getMonCtx());
- } else {
- LOG.trace("Invalid commit TS found in Commit Table");
- replyProc.abortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
- }
- } else {
- LOG.trace("No commit TS found in Commit Table");
- replyProc.abortResponse(startTimestamp, event.getChannel(), event.getMonCtx());
- }
- } catch (InterruptedException e) {
- LOG.error("Interrupted reading from commit table");
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- LOG.error("Error reading from commit table", e);
- }
-
- retriesMeter.mark();
- }
-
- @Override
- public void disambiguateRetryRequestHeuristically(long startTimestamp, Channel c, MonitoringContext monCtx) {
- long seq = retryRing.next();
- RetryEvent e = retryRing.get(seq);
- RetryEvent.makeCommitRetry(e, startTimestamp, c, monCtx);
- retryRing.publish(seq);
- }
-
- public final static class RetryEvent {
-
- enum Type {
- COMMIT
- }
-
- private Type type = null;
-
- private long startTimestamp = 0;
- private Channel channel = null;
- private MonitoringContext monCtx;
-
- static void makeCommitRetry(RetryEvent e, long startTimestamp, Channel c, MonitoringContext monCtx) {
- e.monCtx = monCtx;
- e.type = Type.COMMIT;
- e.startTimestamp = startTimestamp;
- e.channel = c;
- }
-
- MonitoringContext getMonCtx() {
- return monCtx;
- }
-
- Type getType() {
- return type;
- }
-
- Channel getChannel() {
- return channel;
- }
-
- long getStartTimestamp() {
- return startTimestamp;
- }
-
- public final static EventFactory<RetryEvent> EVENT_FACTORY
- = new EventFactory<RetryEvent>() {
- @Override
- public RetryEvent newInstance() {
- return new RetryEvent();
- }
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/main/java/com/yahoo/omid/tso/SystemExitPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/com/yahoo/omid/tso/SystemExitPanicker.java b/tso-server/src/main/java/com/yahoo/omid/tso/SystemExitPanicker.java
deleted file mode 100644
index 7cfd77a..0000000
--- a/tso-server/src/main/java/com/yahoo/omid/tso/SystemExitPanicker.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tso;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SystemExitPanicker implements Panicker {
- private static final Logger LOG = LoggerFactory.getLogger(SystemExitPanicker.class);
- private static final int PANIC_EXIT_CODE = 123;
-
- @Override
- public void panic(String reason, Throwable cause) {
- LOG.error("PANICKING: {}", reason, cause);
- System.exit(PANIC_EXIT_CODE);
- }
-}