You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:34 UTC
[27/30] incubator-distributedlog git commit: DL-205: Remove
StatusCode dependency on DLException
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java
deleted file mode 100644
index 4fe8141..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetRoutingService.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import org.apache.distributedlog.service.DLSocketAddress;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Routing Service based on a given {@link com.twitter.common.zookeeper.ServerSet}.
- */
-class ServerSetRoutingService extends Thread implements RoutingService {
-
- private static final Logger logger = LoggerFactory.getLogger(ServerSetRoutingService.class);
-
- static ServerSetRoutingServiceBuilder newServerSetRoutingServiceBuilder() {
- return new ServerSetRoutingServiceBuilder();
- }
-
- /**
- * Builder to build {@link com.twitter.common.zookeeper.ServerSet} based routing service.
- */
- static class ServerSetRoutingServiceBuilder implements RoutingService.Builder {
-
- private ServerSetWatcher serverSetWatcher;
-
- private ServerSetRoutingServiceBuilder() {}
-
- public ServerSetRoutingServiceBuilder serverSetWatcher(ServerSetWatcher serverSetWatcher) {
- this.serverSetWatcher = serverSetWatcher;
- return this;
- }
-
- @Override
- public Builder statsReceiver(StatsReceiver statsReceiver) {
- return this;
- }
-
- @Override
- public RoutingService build() {
- checkNotNull(serverSetWatcher, "No serverset watcher provided.");
- return new ServerSetRoutingService(this.serverSetWatcher);
- }
- }
-
- private static class HostComparator implements Comparator<SocketAddress> {
-
- private static final HostComparator INSTANCE = new HostComparator();
-
- @Override
- public int compare(SocketAddress o1, SocketAddress o2) {
- return o1.toString().compareTo(o2.toString());
- }
- }
-
- private final ServerSetWatcher serverSetWatcher;
-
- private final Set<SocketAddress> hostSet = new HashSet<SocketAddress>();
- private List<SocketAddress> hostList = new ArrayList<SocketAddress>();
- private final HashFunction hasher = Hashing.md5();
-
- // Server Set Changes
- private final AtomicReference<ImmutableSet<DLSocketAddress>> serverSetChange =
- new AtomicReference<ImmutableSet<DLSocketAddress>>(null);
- private final CountDownLatch changeLatch = new CountDownLatch(1);
-
- // Listeners
- protected final CopyOnWriteArraySet<RoutingListener> listeners =
- new CopyOnWriteArraySet<RoutingListener>();
-
- ServerSetRoutingService(ServerSetWatcher serverSetWatcher) {
- super("ServerSetRoutingService");
- this.serverSetWatcher = serverSetWatcher;
- }
-
- @Override
- public Set<SocketAddress> getHosts() {
- synchronized (hostSet) {
- return ImmutableSet.copyOf(hostSet);
- }
- }
-
- @Override
- public void startService() {
- start();
- try {
- if (!changeLatch.await(1, TimeUnit.MINUTES)) {
- logger.warn("No serverset change received in 1 minute.");
- }
- } catch (InterruptedException e) {
- logger.warn("Interrupted waiting first serverset change : ", e);
- }
- logger.info("{} Routing Service Started.", getClass().getSimpleName());
- }
-
- @Override
- public void stopService() {
- Thread.currentThread().interrupt();
- try {
- join();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.warn("Interrupted on waiting serverset routing service to finish : ", e);
- }
- logger.info("{} Routing Service Stopped.", getClass().getSimpleName());
- }
-
- @Override
- public RoutingService registerListener(RoutingListener listener) {
- listeners.add(listener);
- return this;
- }
-
- @Override
- public RoutingService unregisterListener(RoutingListener listener) {
- listeners.remove(listener);
- return this;
- }
-
- @Override
- public SocketAddress getHost(String key, RoutingContext rContext)
- throws NoBrokersAvailableException {
- SocketAddress address = null;
- synchronized (hostSet) {
- if (0 != hostList.size()) {
- int hashCode = hasher.hashUnencodedChars(key).asInt();
- int hostId = signSafeMod(hashCode, hostList.size());
- address = hostList.get(hostId);
- if (rContext.isTriedHost(address)) {
- ArrayList<SocketAddress> newList = new ArrayList<SocketAddress>(hostList);
- newList.remove(hostId);
- // pickup a new host by rehashing it.
- hostId = signSafeMod(hashCode, newList.size());
- address = newList.get(hostId);
- int i = hostId;
- while (rContext.isTriedHost(address)) {
- i = (i + 1) % newList.size();
- if (i == hostId) {
- address = null;
- break;
- }
- address = newList.get(i);
- }
- }
- }
- }
- if (null == address) {
- throw new NoBrokersAvailableException("No host is available.");
- }
- return address;
- }
-
- @Override
- public void removeHost(SocketAddress host, Throwable reason) {
- synchronized (hostSet) {
- if (hostSet.remove(host)) {
- logger.info("Node {} left due to : ", host, reason);
- }
- hostList = new ArrayList<SocketAddress>(hostSet);
- Collections.sort(hostList, HostComparator.INSTANCE);
- logger.info("Host list becomes : {}.", hostList);
- }
- }
-
- @Override
- public void run() {
- try {
- serverSetWatcher.watch(new ServerSetWatcher.ServerSetMonitor() {
- @Override
- public void onChange(ImmutableSet<DLSocketAddress> serviceInstances) {
- ImmutableSet<DLSocketAddress> lastValue = serverSetChange.getAndSet(serviceInstances);
- if (null == lastValue) {
- ImmutableSet<DLSocketAddress> mostRecentValue;
- do {
- mostRecentValue = serverSetChange.get();
- performServerSetChange(mostRecentValue);
- changeLatch.countDown();
- } while (!serverSetChange.compareAndSet(mostRecentValue, null));
- }
- }
- });
- } catch (Exception e) {
- logger.error("Fail to monitor server set : ", e);
- Runtime.getRuntime().exit(-1);
- }
- }
-
- protected synchronized void performServerSetChange(ImmutableSet<DLSocketAddress> serverSet) {
- Set<SocketAddress> newSet = new HashSet<SocketAddress>();
- for (DLSocketAddress serviceInstance : serverSet) {
- newSet.add(serviceInstance.getSocketAddress());
- }
-
- Set<SocketAddress> removed;
- Set<SocketAddress> added;
- synchronized (hostSet) {
- removed = Sets.difference(hostSet, newSet).immutableCopy();
- added = Sets.difference(newSet, hostSet).immutableCopy();
- for (SocketAddress node: removed) {
- if (hostSet.remove(node)) {
- logger.info("Node {} left.", node);
- }
- }
- for (SocketAddress node: added) {
- if (hostSet.add(node)) {
- logger.info("Node {} joined.", node);
- }
- }
- }
-
- for (SocketAddress addr : removed) {
- for (RoutingListener listener : listeners) {
- listener.onServerLeft(addr);
- }
- }
-
- for (SocketAddress addr : added) {
- for (RoutingListener listener : listeners) {
- listener.onServerJoin(addr);
- }
- }
-
- synchronized (hostSet) {
- hostList = new ArrayList<SocketAddress>(hostSet);
- Collections.sort(hostList, HostComparator.INSTANCE);
- logger.info("Host list becomes : {}.", hostList);
- }
-
- }
-
- static int signSafeMod(long dividend, int divisor) {
- int mod = (int) (dividend % divisor);
-
- if (mod < 0) {
- mod += divisor;
- }
-
- return mod;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java
deleted file mode 100644
index 77b7beb..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/ServerSetWatcher.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import com.google.common.collect.ImmutableSet;
-import org.apache.distributedlog.service.DLSocketAddress;
-
-/**
- * Watch on server set changes.
- */
-public interface ServerSetWatcher {
-
- /**
- * Exception thrown when failed to monitor serverset.
- */
- class MonitorException extends Exception {
-
- private static final long serialVersionUID = 392751505154339548L;
-
- public MonitorException(String msg) {
- super(msg);
- }
-
- public MonitorException(String msg, Throwable cause) {
- super(msg, cause);
- }
- }
-
- /**
- * An interface to an object that is interested in receiving notification whenever the host set changes.
- */
- interface ServerSetMonitor {
-
- /**
- * Called when either the available set of services changes.
- *
- * <p>It happens either when a service dies or a new INSTANCE comes on-line or
- * when an existing service advertises a status or health change.
- *
- * @param hostSet the current set of available ServiceInstances
- */
- void onChange(ImmutableSet<DLSocketAddress> hostSet);
- }
-
- /**
- * Registers a monitor to receive change notices for this server set as long as this jvm process is alive.
- *
- * <p>Blocks until the initial server set can be gathered and delivered to the monitor.
- * The monitor will be notified if the membership set or parameters of existing members have
- * changed.
- *
- * @param monitor the server set monitor to call back when the host set changes
- * @throws MonitorException if there is a problem monitoring the host set
- */
- void watch(final ServerSetMonitor monitor) throws MonitorException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java
deleted file mode 100644
index 753a1af..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/SingleHostRoutingService.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Sets;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.net.SocketAddress;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-
-/**
- * Single Host Routing Service.
- */
-public class SingleHostRoutingService implements RoutingService {
-
- public static SingleHostRoutingService of(SocketAddress address) {
- return new SingleHostRoutingService(address);
- }
-
- /**
- * Builder to build single host based routing service.
- *
- * @return builder to build single host based routing service.
- */
- public static Builder newBuilder() {
- return new Builder();
- }
-
- /**
- * Builder to build single host based routing service.
- */
- public static class Builder implements RoutingService.Builder {
-
- private SocketAddress address;
-
- private Builder() {}
-
- public Builder address(SocketAddress address) {
- this.address = address;
- return this;
- }
-
- @Override
- public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) {
- return this;
- }
-
- @Override
- public RoutingService build() {
- checkNotNull(address, "Host is null");
- return new SingleHostRoutingService(address);
- }
- }
-
- private SocketAddress address;
- private final CopyOnWriteArraySet<RoutingListener> listeners =
- new CopyOnWriteArraySet<RoutingListener>();
-
- SingleHostRoutingService(SocketAddress address) {
- this.address = address;
- }
-
- public void setAddress(SocketAddress address) {
- this.address = address;
- }
-
- @Override
- public Set<SocketAddress> getHosts() {
- return Sets.newHashSet(address);
- }
-
- @Override
- public void startService() {
- // no-op
- for (RoutingListener listener : listeners) {
- listener.onServerJoin(address);
- }
- }
-
- @Override
- public void stopService() {
- // no-op
- }
-
- @Override
- public RoutingService registerListener(RoutingListener listener) {
- listeners.add(listener);
- return this;
- }
-
- @Override
- public RoutingService unregisterListener(RoutingListener listener) {
- listeners.remove(listener);
- return null;
- }
-
- @Override
- public SocketAddress getHost(String key, RoutingContext rContext)
- throws NoBrokersAvailableException {
- if (rContext.isTriedHost(address)) {
- throw new NoBrokersAvailableException("No hosts is available : routing context = " + rContext);
- }
- return address;
- }
-
- @Override
- public void removeHost(SocketAddress address, Throwable reason) {
- // no-op
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java
deleted file mode 100644
index 2fc8de0..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TestName.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import com.twitter.finagle.Addr;
-import com.twitter.finagle.Address;
-import com.twitter.finagle.Addrs;
-import com.twitter.finagle.Name;
-import java.util.List;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-/**
- * A {@link Name} implementation for testing purpose.
- */
-public class TestName implements Name {
-
- private static final Logger LOG = LoggerFactory.getLogger(TestName.class);
-
- private AbstractFunction1<Addr, BoxedUnit> callback = null;
-
- public void changes(AbstractFunction1<Addr, BoxedUnit> callback) {
- this.callback = callback;
- }
-
- public void changeAddrs(List<Address> addresses) {
- if (null != callback) {
- LOG.info("Sending a callback {}", addresses);
- callback.apply(Addrs.newBoundAddr(addresses));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java
deleted file mode 100644
index 1ff7c93..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/TwitterServerSetWatcher.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.routing;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.twitter.common.net.pool.DynamicHostSet;
-import com.twitter.common.zookeeper.ServerSet;
-import org.apache.distributedlog.service.DLSocketAddress;
-import com.twitter.thrift.Endpoint;
-import com.twitter.thrift.ServiceInstance;
-import java.net.InetSocketAddress;
-import java.util.Set;
-
-/**
- * Twitter {@link ServerSet} based watcher.
- */
-public class TwitterServerSetWatcher implements ServerSetWatcher {
-
- private final ServerSet serverSet;
- private final boolean resolvedFromName;
-
- /**
- * Construct a {@link ServerSet} based watcher.
- *
- * @param serverSet server set.
- * @param resolvedFromName whether to resolve hosts from {@link com.twitter.finagle.Name}.
- */
- public TwitterServerSetWatcher(ServerSet serverSet,
- boolean resolvedFromName) {
- this.serverSet = serverSet;
- this.resolvedFromName = resolvedFromName;
- }
-
- /**
- * Registers a monitor to receive change notices for this server set as long as this jvm process is alive.
- *
- * <p>Blocks until the initial server set can be gathered and delivered to the monitor.
- * The monitor will be notified if the membership set or parameters of existing members have
- * changed.
- *
- * @param monitor the server set monitor to call back when the host set changes
- * @throws MonitorException if there is a problem monitoring the host set
- */
- public void watch(final ServerSetMonitor monitor)
- throws MonitorException {
- try {
- serverSet.watch(new DynamicHostSet.HostChangeMonitor<ServiceInstance>() {
- @Override
- public void onChange(ImmutableSet<ServiceInstance> serviceInstances) {
- Set<DLSocketAddress> dlServers = Sets.newHashSet();
- for (ServiceInstance serviceInstance : serviceInstances) {
- Endpoint endpoint = serviceInstance.getAdditionalEndpoints().get("thrift");
- InetSocketAddress inetAddr =
- new InetSocketAddress(endpoint.getHost(), endpoint.getPort());
- int shardId = resolvedFromName ? -1 : serviceInstance.getShard();
- DLSocketAddress address = new DLSocketAddress(shardId, inetAddr);
- dlServers.add(address);
- }
- monitor.onChange(ImmutableSet.copyOf(dlServers));
- }
- });
- } catch (DynamicHostSet.MonitorException me) {
- throw new MonitorException("Failed to monitor server set : ", me);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java
deleted file mode 100644
index 352d755..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/routing/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Routing Mechanisms to route the traffic to the owner of streams.
- */
-package org.apache.distributedlog.client.routing;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java
deleted file mode 100644
index 93cdf7a..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/DLZkServerSet.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.serverset;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.net.HostAndPort;
-import com.twitter.common.quantity.Amount;
-import com.twitter.common.quantity.Time;
-import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.common.zookeeper.ServerSets;
-import com.twitter.common.zookeeper.ZooKeeperClient;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import org.apache.commons.lang.StringUtils;
-import org.apache.zookeeper.ZooDefs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A wrapper over zookeeper client and its server set.
- */
-public class DLZkServerSet {
-
- private static final Logger logger = LoggerFactory.getLogger(DLZkServerSet.class);
-
- static final String ZNODE_WRITE_PROXY = ".write_proxy";
-
- private static String getZKServersFromDLUri(URI uri) {
- return uri.getAuthority().replace(";", ",");
- }
-
- private static Iterable<InetSocketAddress> getZkAddresses(URI uri) {
- String zkServers = getZKServersFromDLUri(uri);
- String[] zkServerList = StringUtils.split(zkServers, ',');
- ImmutableList.Builder<InetSocketAddress> builder = ImmutableList.builder();
- for (String zkServer : zkServerList) {
- HostAndPort hostAndPort = HostAndPort.fromString(zkServer).withDefaultPort(2181);
- builder.add(InetSocketAddress.createUnresolved(
- hostAndPort.getHostText(),
- hostAndPort.getPort()));
- }
- return builder.build();
- }
-
- public static DLZkServerSet of(URI uri,
- int zkSessionTimeoutMs) {
- // Create zookeeper and server set
- String zkPath = uri.getPath() + "/" + ZNODE_WRITE_PROXY;
- Iterable<InetSocketAddress> zkAddresses = getZkAddresses(uri);
- ZooKeeperClient zkClient =
- new ZooKeeperClient(Amount.of(zkSessionTimeoutMs, Time.MILLISECONDS), zkAddresses);
- ServerSet serverSet = ServerSets.create(zkClient, ZooDefs.Ids.OPEN_ACL_UNSAFE, zkPath);
- return new DLZkServerSet(zkClient, serverSet);
- }
-
- private final ZooKeeperClient zkClient;
- private final ServerSet zkServerSet;
-
- public DLZkServerSet(ZooKeeperClient zkClient,
- ServerSet zkServerSet) {
- this.zkClient = zkClient;
- this.zkServerSet = zkServerSet;
- }
-
- public ZooKeeperClient getZkClient() {
- return zkClient;
- }
-
- public ServerSet getServerSet() {
- return zkServerSet;
- }
-
- public void close() {
- zkClient.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java
deleted file mode 100644
index 38a7544..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/serverset/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Utils related to server set.
- */
-package org.apache.distributedlog.client.serverset;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
deleted file mode 100644
index f1da33c..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/DefaultSpeculativeRequestExecutionPolicy.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.speculative;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Default implementation of {@link SpeculativeRequestExecutionPolicy}.
- */
-public class DefaultSpeculativeRequestExecutionPolicy implements SpeculativeRequestExecutionPolicy {
-
- private static final Logger LOG = LoggerFactory.getLogger(DefaultSpeculativeRequestExecutionPolicy.class);
- final int firstSpeculativeRequestTimeout;
- final int maxSpeculativeRequestTimeout;
- final float backoffMultiplier;
- int nextSpeculativeRequestTimeout;
-
- public DefaultSpeculativeRequestExecutionPolicy(int firstSpeculativeRequestTimeout,
- int maxSpeculativeRequestTimeout,
- float backoffMultiplier) {
- this.firstSpeculativeRequestTimeout = firstSpeculativeRequestTimeout;
- this.maxSpeculativeRequestTimeout = maxSpeculativeRequestTimeout;
- this.backoffMultiplier = backoffMultiplier;
- this.nextSpeculativeRequestTimeout = firstSpeculativeRequestTimeout;
-
- if (backoffMultiplier <= 0) {
- throw new IllegalArgumentException("Invalid value provided for backoffMultiplier");
- }
-
- // Prevent potential over flow
- if (Math.round((double) maxSpeculativeRequestTimeout * (double) backoffMultiplier) > Integer.MAX_VALUE) {
- throw new IllegalArgumentException("Invalid values for maxSpeculativeRequestTimeout and backoffMultiplier");
- }
- }
-
- @VisibleForTesting
- int getNextSpeculativeRequestTimeout() {
- return nextSpeculativeRequestTimeout;
- }
-
- /**
- * Initialize the speculative request execution policy.
- *
- * @param scheduler The scheduler service to issue the speculative request
- * @param requestExecutor The executor is used to issue the actual speculative requests
- */
- @Override
- public void initiateSpeculativeRequest(final ScheduledExecutorService scheduler,
- final SpeculativeRequestExecutor requestExecutor) {
- issueSpeculativeRequest(scheduler, requestExecutor);
- }
-
- private void issueSpeculativeRequest(final ScheduledExecutorService scheduler,
- final SpeculativeRequestExecutor requestExecutor) {
- Future<Boolean> issueNextRequest = requestExecutor.issueSpeculativeRequest();
- issueNextRequest.addEventListener(new FutureEventListener<Boolean>() {
- // we want this handler to run immediately after we push the big red button!
- @Override
- public void onSuccess(Boolean issueNextRequest) {
- if (issueNextRequest) {
- scheduleSpeculativeRequest(scheduler, requestExecutor, nextSpeculativeRequestTimeout);
- nextSpeculativeRequestTimeout = Math.min(maxSpeculativeRequestTimeout,
- (int) (nextSpeculativeRequestTimeout * backoffMultiplier));
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Stopped issuing speculative requests for {}, "
- + "speculativeReadTimeout = {}", requestExecutor, nextSpeculativeRequestTimeout);
- }
- }
- }
-
- @Override
- public void onFailure(Throwable thrown) {
- LOG.warn("Failed to issue speculative request for {}, speculativeReadTimeout = {} : ",
- new Object[] { requestExecutor, nextSpeculativeRequestTimeout, thrown });
- }
- });
- }
-
- private void scheduleSpeculativeRequest(final ScheduledExecutorService scheduler,
- final SpeculativeRequestExecutor requestExecutor,
- final int speculativeRequestTimeout) {
- try {
- scheduler.schedule(new Runnable() {
- @Override
- public void run() {
- issueSpeculativeRequest(scheduler, requestExecutor);
- }
- }, speculativeRequestTimeout, TimeUnit.MILLISECONDS);
- } catch (RejectedExecutionException re) {
- if (!scheduler.isShutdown()) {
- LOG.warn("Failed to schedule speculative request for {}, speculativeReadTimeout = {} : ",
- new Object[]{requestExecutor, speculativeRequestTimeout, re});
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java
deleted file mode 100644
index faf45c2..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutionPolicy.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.speculative;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-/**
- * Speculative request execution policy.
- */
-public interface SpeculativeRequestExecutionPolicy {
- /**
- * Initialize the speculative request execution policy and initiate requests.
- *
- * @param scheduler The scheduler service to issue the speculative request
- * @param requestExecutor The executor is used to issue the actual speculative requests
- */
- void initiateSpeculativeRequest(ScheduledExecutorService scheduler,
- SpeculativeRequestExecutor requestExecutor);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java
deleted file mode 100644
index 68fe8b0..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/SpeculativeRequestExecutor.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.speculative;
-
-import com.twitter.util.Future;
-
-/**
- * Executor to execute speculative requests.
- */
-public interface SpeculativeRequestExecutor {
-
- /**
- * Issues a speculative request and indicates if more speculative requests should be issued.
- *
- * @return whether more speculative requests should be issued.
- */
- Future<Boolean> issueSpeculativeRequest();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java
deleted file mode 100644
index 4bdd4b1..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/speculative/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Speculative Mechanism.
- */
-package org.apache.distributedlog.client.speculative;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java
deleted file mode 100644
index c2dcddd..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStats.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.stats;
-
-import org.apache.distributedlog.client.resolver.RegionResolver;
-import org.apache.distributedlog.thrift.service.StatusCode;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.net.SocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Client Stats.
- */
-public class ClientStats {
-
- // Region Resolver
- private final RegionResolver regionResolver;
-
- // Stats
- private final StatsReceiver statsReceiver;
- private final ClientStatsLogger clientStatsLogger;
- private final boolean enableRegionStats;
- private final ConcurrentMap<String, ClientStatsLogger> regionClientStatsLoggers;
- private final ConcurrentMap<String, OpStats> opStatsMap;
-
- public ClientStats(StatsReceiver statsReceiver,
- boolean enableRegionStats,
- RegionResolver regionResolver) {
- this.statsReceiver = statsReceiver;
- this.clientStatsLogger = new ClientStatsLogger(statsReceiver);
- this.enableRegionStats = enableRegionStats;
- this.regionClientStatsLoggers = new ConcurrentHashMap<String, ClientStatsLogger>();
- this.regionResolver = regionResolver;
- this.opStatsMap = new ConcurrentHashMap<String, OpStats>();
- }
-
- public OpStats getOpStats(String op) {
- OpStats opStats = opStatsMap.get(op);
- if (null != opStats) {
- return opStats;
- }
- OpStats newStats = new OpStats(statsReceiver.scope(op),
- enableRegionStats, regionResolver);
- OpStats oldStats = opStatsMap.putIfAbsent(op, newStats);
- if (null == oldStats) {
- return newStats;
- } else {
- return oldStats;
- }
- }
-
- private ClientStatsLogger getRegionClientStatsLogger(SocketAddress address) {
- String region = regionResolver.resolveRegion(address);
- return getRegionClientStatsLogger(region);
- }
-
- private ClientStatsLogger getRegionClientStatsLogger(String region) {
- ClientStatsLogger statsLogger = regionClientStatsLoggers.get(region);
- if (null == statsLogger) {
- ClientStatsLogger newStatsLogger = new ClientStatsLogger(statsReceiver.scope(region));
- ClientStatsLogger oldStatsLogger = regionClientStatsLoggers.putIfAbsent(region, newStatsLogger);
- if (null == oldStatsLogger) {
- statsLogger = newStatsLogger;
- } else {
- statsLogger = oldStatsLogger;
- }
- }
- return statsLogger;
- }
-
- public StatsReceiver getFinagleStatsReceiver(SocketAddress addr) {
- if (enableRegionStats && null != addr) {
- return getRegionClientStatsLogger(addr).getStatsReceiver();
- } else {
- return clientStatsLogger.getStatsReceiver();
- }
- }
-
- public void completeProxyRequest(SocketAddress addr, StatusCode code, long startTimeNanos) {
- clientStatsLogger.completeProxyRequest(code, startTimeNanos);
- if (enableRegionStats && null != addr) {
- getRegionClientStatsLogger(addr).completeProxyRequest(code, startTimeNanos);
- }
- }
-
- public void failProxyRequest(SocketAddress addr, Throwable cause, long startTimeNanos) {
- clientStatsLogger.failProxyRequest(cause, startTimeNanos);
- if (enableRegionStats && null != addr) {
- getRegionClientStatsLogger(addr).failProxyRequest(cause, startTimeNanos);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java
deleted file mode 100644
index 530c632..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/ClientStatsLogger.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.stats;
-
-import org.apache.distributedlog.thrift.service.StatusCode;
-import com.twitter.finagle.stats.Counter;
-import com.twitter.finagle.stats.Stat;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Stats Logger to collect client stats.
- */
-public class ClientStatsLogger {
-
- // Stats
- private final StatsReceiver statsReceiver;
- private final StatsReceiver responseStatsReceiver;
- private final ConcurrentMap<StatusCode, Counter> responseStats =
- new ConcurrentHashMap<StatusCode, Counter>();
- private final StatsReceiver exceptionStatsReceiver;
- private final ConcurrentMap<Class<?>, Counter> exceptionStats =
- new ConcurrentHashMap<Class<?>, Counter>();
-
- private final Stat proxySuccessLatencyStat;
- private final Stat proxyFailureLatencyStat;
-
- public ClientStatsLogger(StatsReceiver statsReceiver) {
- this.statsReceiver = statsReceiver;
- responseStatsReceiver = statsReceiver.scope("responses");
- exceptionStatsReceiver = statsReceiver.scope("exceptions");
- StatsReceiver proxyLatencyStatReceiver = statsReceiver.scope("proxy_request_latency");
- proxySuccessLatencyStat = proxyLatencyStatReceiver.stat0("success");
- proxyFailureLatencyStat = proxyLatencyStatReceiver.stat0("failure");
- }
-
- public StatsReceiver getStatsReceiver() {
- return statsReceiver;
- }
-
- private Counter getResponseCounter(StatusCode code) {
- Counter counter = responseStats.get(code);
- if (null == counter) {
- Counter newCounter = responseStatsReceiver.counter0(code.name());
- Counter oldCounter = responseStats.putIfAbsent(code, newCounter);
- counter = null != oldCounter ? oldCounter : newCounter;
- }
- return counter;
- }
-
- private Counter getExceptionCounter(Class<?> cls) {
- Counter counter = exceptionStats.get(cls);
- if (null == counter) {
- Counter newCounter = exceptionStatsReceiver.counter0(cls.getName());
- Counter oldCounter = exceptionStats.putIfAbsent(cls, newCounter);
- counter = null != oldCounter ? oldCounter : newCounter;
- }
- return counter;
- }
-
- public void completeProxyRequest(StatusCode code, long startTimeNanos) {
- getResponseCounter(code).incr();
- proxySuccessLatencyStat.add(elapsedMicroSec(startTimeNanos));
- }
-
- public void failProxyRequest(Throwable cause, long startTimeNanos) {
- getExceptionCounter(cause.getClass()).incr();
- proxyFailureLatencyStat.add(elapsedMicroSec(startTimeNanos));
- }
-
- static long elapsedMicroSec(long startNanoTime) {
- return TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanoTime);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java
deleted file mode 100644
index 7a49faa..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStats.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.stats;
-
-import org.apache.distributedlog.client.resolver.RegionResolver;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.net.SocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Op Stats.
- */
-public class OpStats {
-
- // Region Resolver
- private final RegionResolver regionResolver;
-
- // Stats
- private final StatsReceiver statsReceiver;
- private final OpStatsLogger opStatsLogger;
- private final boolean enableRegionStats;
- private final ConcurrentMap<String, OpStatsLogger> regionOpStatsLoggers;
-
- public OpStats(StatsReceiver statsReceiver,
- boolean enableRegionStats,
- RegionResolver regionResolver) {
- this.statsReceiver = statsReceiver;
- this.opStatsLogger = new OpStatsLogger(statsReceiver);
- this.enableRegionStats = enableRegionStats;
- this.regionOpStatsLoggers = new ConcurrentHashMap<String, OpStatsLogger>();
- this.regionResolver = regionResolver;
- }
-
- private OpStatsLogger getRegionOpStatsLogger(SocketAddress address) {
- String region = regionResolver.resolveRegion(address);
- return getRegionOpStatsLogger(region);
- }
-
- private OpStatsLogger getRegionOpStatsLogger(String region) {
- OpStatsLogger statsLogger = regionOpStatsLoggers.get(region);
- if (null == statsLogger) {
- OpStatsLogger newStatsLogger = new OpStatsLogger(statsReceiver.scope(region));
- OpStatsLogger oldStatsLogger = regionOpStatsLoggers.putIfAbsent(region, newStatsLogger);
- if (null == oldStatsLogger) {
- statsLogger = newStatsLogger;
- } else {
- statsLogger = oldStatsLogger;
- }
- }
- return statsLogger;
- }
-
- public void completeRequest(SocketAddress addr, long micros, int numTries) {
- opStatsLogger.completeRequest(micros, numTries);
- if (enableRegionStats && null != addr) {
- getRegionOpStatsLogger(addr).completeRequest(micros, numTries);
- }
- }
-
- public void failRequest(SocketAddress addr, long micros, int numTries) {
- opStatsLogger.failRequest(micros, numTries);
- if (enableRegionStats && null != addr) {
- getRegionOpStatsLogger(addr).failRequest(micros, numTries);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java
deleted file mode 100644
index b94b4be..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OpStatsLogger.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.stats;
-
-import com.twitter.finagle.stats.Stat;
-import com.twitter.finagle.stats.StatsReceiver;
-
-/**
- * Stats Logger per operation type.
- */
-public class OpStatsLogger {
-
- private final Stat successLatencyStat;
- private final Stat failureLatencyStat;
- private final Stat redirectStat;
-
- public OpStatsLogger(StatsReceiver statsReceiver) {
- StatsReceiver latencyStatReceiver = statsReceiver.scope("latency");
- successLatencyStat = latencyStatReceiver.stat0("success");
- failureLatencyStat = latencyStatReceiver.stat0("failure");
- StatsReceiver redirectStatReceiver = statsReceiver.scope("redirects");
- redirectStat = redirectStatReceiver.stat0("times");
- }
-
- public void completeRequest(long micros, int numTries) {
- successLatencyStat.add(micros);
- redirectStat.add(numTries);
- }
-
- public void failRequest(long micros, int numTries) {
- failureLatencyStat.add(micros);
- redirectStat.add(numTries);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java
deleted file mode 100644
index 110e99a..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/OwnershipStatsLogger.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.stats;
-
-import com.twitter.finagle.stats.Counter;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Stats Logger for ownerships.
- */
-public class OwnershipStatsLogger {
-
- /**
- * Ownership related stats.
- */
- public static class OwnershipStat {
- private final Counter hits;
- private final Counter misses;
- private final Counter removes;
- private final Counter redirects;
- private final Counter adds;
-
- OwnershipStat(StatsReceiver ownershipStats) {
- hits = ownershipStats.counter0("hits");
- misses = ownershipStats.counter0("misses");
- adds = ownershipStats.counter0("adds");
- removes = ownershipStats.counter0("removes");
- redirects = ownershipStats.counter0("redirects");
- }
-
- public void onHit() {
- hits.incr();
- }
-
- public void onMiss() {
- misses.incr();
- }
-
- public void onAdd() {
- adds.incr();
- }
-
- public void onRemove() {
- removes.incr();
- }
-
- public void onRedirect() {
- redirects.incr();
- }
-
- }
-
- private final OwnershipStat ownershipStat;
- private final StatsReceiver ownershipStatsReceiver;
- private final ConcurrentMap<String, OwnershipStat> ownershipStats =
- new ConcurrentHashMap<String, OwnershipStat>();
-
- public OwnershipStatsLogger(StatsReceiver statsReceiver,
- StatsReceiver streamStatsReceiver) {
- this.ownershipStat = new OwnershipStat(statsReceiver.scope("ownership"));
- this.ownershipStatsReceiver = streamStatsReceiver.scope("perstream_ownership");
- }
-
- private OwnershipStat getOwnershipStat(String stream) {
- OwnershipStat stat = ownershipStats.get(stream);
- if (null == stat) {
- OwnershipStat newStat = new OwnershipStat(ownershipStatsReceiver.scope(stream));
- OwnershipStat oldStat = ownershipStats.putIfAbsent(stream, newStat);
- stat = null != oldStat ? oldStat : newStat;
- }
- return stat;
- }
-
- public void onMiss(String stream) {
- ownershipStat.onMiss();
- getOwnershipStat(stream).onMiss();
- }
-
- public void onHit(String stream) {
- ownershipStat.onHit();
- getOwnershipStat(stream).onHit();
- }
-
- public void onRedirect(String stream) {
- ownershipStat.onRedirect();
- getOwnershipStat(stream).onRedirect();
- }
-
- public void onRemove(String stream) {
- ownershipStat.onRemove();
- getOwnershipStat(stream).onRemove();
- }
-
- public void onAdd(String stream) {
- ownershipStat.onAdd();
- getOwnershipStat(stream).onAdd();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java
deleted file mode 100644
index 106d3fc..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/client/stats/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Client side stats utils.
- */
-package org.apache.distributedlog.client.stats;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java
deleted file mode 100644
index 68e6825..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DLSocketAddress.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-
-/**
- * Socket Address identifier for a DL proxy.
- */
-public class DLSocketAddress {
-
- private static final int VERSION = 1;
-
- private static final String COLON = ":";
- private static final String SEP = ";";
-
- private final int shard;
- private final InetSocketAddress socketAddress;
-
- public DLSocketAddress(int shard, InetSocketAddress socketAddress) {
- this.shard = shard;
- this.socketAddress = socketAddress;
- }
-
- /**
- * Shard id for dl write proxy.
- *
- * @return shard id for dl write proxy.
- */
- public int getShard() {
- return shard;
- }
-
- /**
- * Socket address for dl write proxy.
- *
- * @return socket address for dl write proxy
- */
- public InetSocketAddress getSocketAddress() {
- return socketAddress;
- }
-
- /**
- * Serialize the write proxy identifier to string.
- *
- * @return serialized write proxy identifier.
- */
- public String serialize() {
- return toLockId(socketAddress, shard);
- }
-
- @Override
- public int hashCode() {
- return socketAddress.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof DLSocketAddress)) {
- return false;
- }
- DLSocketAddress other = (DLSocketAddress) obj;
- return shard == other.shard && socketAddress.equals(other.socketAddress);
- }
-
- @Override
- public String toString() {
- return toLockId(socketAddress, shard);
- }
-
- /**
- * Deserialize proxy address from a string representation.
- *
- * @param lockId
- * string representation of the proxy address.
- * @return proxy address.
- * @throws IOException
- */
- public static DLSocketAddress deserialize(String lockId) throws IOException {
- String parts[] = lockId.split(SEP);
- if (3 != parts.length) {
- throw new IOException("Invalid dl socket address " + lockId);
- }
- int version;
- try {
- version = Integer.parseInt(parts[0]);
- } catch (NumberFormatException nfe) {
- throw new IOException("Invalid version found in " + lockId, nfe);
- }
- if (VERSION != version) {
- throw new IOException("Invalid version " + version + " found in " + lockId + ", expected " + VERSION);
- }
- int shardId;
- try {
- shardId = Integer.parseInt(parts[1]);
- } catch (NumberFormatException nfe) {
- throw new IOException("Invalid shard id found in " + lockId, nfe);
- }
- InetSocketAddress address = parseSocketAddress(parts[2]);
- return new DLSocketAddress(shardId, address);
- }
-
- /**
- * Parse the inet socket address from the string representation.
- *
- * @param addr
- * string representation
- * @return inet socket address
- */
- public static InetSocketAddress parseSocketAddress(String addr) {
- String[] parts = addr.split(COLON);
- checkArgument(parts.length == 2);
- String hostname = parts[0];
- int port = Integer.parseInt(parts[1]);
- return new InetSocketAddress(hostname, port);
- }
-
- public static InetSocketAddress getSocketAddress(int port) throws UnknownHostException {
- return new InetSocketAddress(InetAddress.getLocalHost().getHostAddress(), port);
- }
-
- /**
- * Convert inet socket address to the string representation.
- *
- * @param address
- * inet socket address.
- * @return string representation of inet socket address.
- */
- public static String toString(InetSocketAddress address) {
- StringBuilder sb = new StringBuilder();
- sb.append(address.getHostName()).append(COLON).append(address.getPort());
- return sb.toString();
- }
-
- public static String toLockId(InetSocketAddress address, int shard) {
- StringBuilder sb = new StringBuilder();
- sb.append(VERSION).append(SEP).append(shard).append(SEP).append(toString(address));
- return sb.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java
deleted file mode 100644
index 9f30815..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClient.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.LogRecordSetBuffer;
-import com.twitter.util.Future;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- * Interface for distributedlog client.
- */
-public interface DistributedLogClient {
- /**
- * Write <i>data</i> to a given <i>stream</i>.
- *
- * @param stream
- * Stream Name.
- * @param data
- * Data to write.
- * @return a future representing a sequence id returned for this write.
- */
- Future<DLSN> write(String stream, ByteBuffer data);
-
- /**
- * Write record set to a given <i>stream</i>.
- *
- * <p>The record set is built from {@link org.apache.distributedlog.LogRecordSet.Writer}
- *
- * @param stream stream to write to
- * @param recordSet record set
- */
- Future<DLSN> writeRecordSet(String stream, LogRecordSetBuffer recordSet);
-
- /**
- * Write <i>data</i> in bulk to a given <i>stream</i>.
- *
- * <p>Return a list of Future dlsns, one for each submitted buffer. In the event of a partial
- * failure--ex. some specific buffer write fails, all subsequent writes
- * will also fail.
- *
- * @param stream
- * Stream Name.
- * @param data
- * Data to write.
- * @return a list of futures, one for each submitted buffer.
- */
- List<Future<DLSN>> writeBulk(String stream, List<ByteBuffer> data);
-
- /**
- * Truncate the stream to a given <i>dlsn</i>.
- *
- * @param stream
- * Stream Name.
- * @param dlsn
- * DLSN to truncate until.
- * @return a future representing the truncation.
- */
- Future<Boolean> truncate(String stream, DLSN dlsn);
-
- /**
- * Release the ownership of a stream <i>stream</i>.
- *
- * @param stream
- * Stream Name to release.
- * @return a future representing the release operation.
- */
- Future<Void> release(String stream);
-
- /**
- * Delete a given stream <i>stream</i>.
- *
- * @param stream
- * Stream Name to delete.
- * @return a future representing the delete operation.
- */
- Future<Void> delete(String stream);
-
- /**
- * Create a stream with name <i>stream</i>.
- *
- * @param stream
- * Stream Name to create.
- * @return a future representing the create operation.
- */
- Future<Void> create(String stream);
-
- /**
- * Close the client.
- */
- void close();
-}