You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:14 UTC
[07/30] incubator-distributedlog git commit: DL-205: Remove
StatusCode dependency on DLException
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
deleted file mode 100644
index 3c53ccf..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/SimpleBalancer.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import com.google.common.base.Optional;
-import com.google.common.util.concurrent.RateLimiter;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.service.DistributedLogClient;
-import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A balancer balances ownerships between two targets.
- */
-public class SimpleBalancer implements Balancer {
-
- private static final Logger logger = LoggerFactory.getLogger(SimpleBalancer.class);
-
- protected final String target1;
- protected final String target2;
- protected final DistributedLogClient targetClient1;
- protected final DistributedLogClient targetClient2;
- protected final MonitorServiceClient targetMonitor1;
- protected final MonitorServiceClient targetMonitor2;
-
- public SimpleBalancer(String name1,
- DistributedLogClient client1,
- MonitorServiceClient monitor1,
- String name2,
- DistributedLogClient client2,
- MonitorServiceClient monitor2) {
- this.target1 = name1;
- this.targetClient1 = client1;
- this.targetMonitor1 = monitor1;
- this.target2 = name2;
- this.targetClient2 = client2;
- this.targetMonitor2 = monitor2;
- }
-
- protected static int countNumberStreams(Map<SocketAddress, Set<String>> distribution) {
- int count = 0;
- for (Set<String> streams : distribution.values()) {
- count += streams.size();
- }
- return count;
- }
-
- @Override
- public void balance(int rebalanceWaterMark,
- double rebalanceTolerancePercentage,
- int rebalanceConcurrency,
- Optional<RateLimiter> rebalanceRateLimiter) {
- // get the ownership distributions from individual targets
- Map<SocketAddress, Set<String>> distribution1 = targetMonitor1.getStreamOwnershipDistribution();
- Map<SocketAddress, Set<String>> distribution2 = targetMonitor2.getStreamOwnershipDistribution();
-
- // get stream counts
- int proxyCount1 = distribution1.size();
- int streamCount1 = countNumberStreams(distribution1);
- int proxyCount2 = distribution2.size();
- int streamCount2 = countNumberStreams(distribution2);
-
- logger.info("'{}' has {} streams by {} proxies; while '{}' has {} streams by {} proxies.",
- new Object[] {target1, streamCount1, proxyCount1, target2, streamCount2, proxyCount2 });
-
- String source, target;
- Map<SocketAddress, Set<String>> srcDistribution;
- DistributedLogClient srcClient, targetClient;
- MonitorServiceClient srcMonitor, targetMonitor;
- int srcStreamCount, targetStreamCount;
- if (streamCount1 > streamCount2) {
- source = target1;
- srcStreamCount = streamCount1;
- srcClient = targetClient1;
- srcMonitor = targetMonitor1;
- srcDistribution = distribution1;
-
- target = target2;
- targetStreamCount = streamCount2;
- targetClient = targetClient2;
- targetMonitor = targetMonitor2;
- } else {
- source = target2;
- srcStreamCount = streamCount2;
- srcClient = targetClient2;
- srcMonitor = targetMonitor2;
- srcDistribution = distribution2;
-
- target = target1;
- targetStreamCount = streamCount1;
- targetClient = targetClient1;
- targetMonitor = targetMonitor1;
- }
-
- Map<String, Integer> loadDistribution = new HashMap<String, Integer>();
- loadDistribution.put(source, srcStreamCount);
- loadDistribution.put(target, targetStreamCount);
-
- // Calculate how many streams to be rebalanced from src region to target region
- int numStreamsToRebalance = BalancerUtils.calculateNumStreamsToRebalance(
- source, loadDistribution, rebalanceWaterMark, rebalanceTolerancePercentage);
-
- if (numStreamsToRebalance <= 0) {
- logger.info("No streams need to be rebalanced from '{}' to '{}'.", source, target);
- return;
- }
-
- StreamChooser streamChooser =
- LimitedStreamChooser.of(new CountBasedStreamChooser(srcDistribution), numStreamsToRebalance);
- StreamMover streamMover =
- new StreamMoverImpl(source, srcClient, srcMonitor, target, targetClient, targetMonitor);
-
- moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter);
- }
-
- @Override
- public void balanceAll(String source,
- int rebalanceConcurrency,
- Optional<RateLimiter> rebalanceRateLimiter) {
- String target;
- DistributedLogClient sourceClient, targetClient;
- MonitorServiceClient sourceMonitor, targetMonitor;
- if (target1.equals(source)) {
- sourceClient = targetClient1;
- sourceMonitor = targetMonitor1;
- target = target2;
- targetClient = targetClient2;
- targetMonitor = targetMonitor2;
- } else if (target2.equals(source)) {
- sourceClient = targetClient2;
- sourceMonitor = targetMonitor2;
- target = target1;
- targetClient = targetClient1;
- targetMonitor = targetMonitor1;
- } else {
- throw new IllegalArgumentException("Unknown target " + source);
- }
-
- // get the ownership distributions from individual targets
- Map<SocketAddress, Set<String>> distribution = sourceMonitor.getStreamOwnershipDistribution();
-
- if (distribution.isEmpty()) {
- return;
- }
-
- StreamChooser streamChooser = new CountBasedStreamChooser(distribution);
- StreamMover streamMover =
- new StreamMoverImpl(source, sourceClient, sourceMonitor, target, targetClient, targetMonitor);
-
- moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter);
- }
-
- private void moveStreams(StreamChooser streamChooser,
- StreamMover streamMover,
- int concurrency,
- Optional<RateLimiter> rateLimiter) {
- CountDownLatch doneLatch = new CountDownLatch(concurrency);
- RegionMover regionMover = new RegionMover(streamChooser, streamMover, rateLimiter, doneLatch);
- ExecutorService executorService = Executors.newFixedThreadPool(concurrency);
- try {
- for (int i = 0; i < concurrency; i++) {
- executorService.submit(regionMover);
- }
-
- try {
- doneLatch.await();
- } catch (InterruptedException e) {
- logger.info("{} is interrupted. Stopping it ...", streamMover);
- regionMover.shutdown();
- }
- } finally {
- executorService.shutdown();
- }
-
- }
-
- /**
- * Move streams from <i>src</i> region to <i>target</i> region.
- */
- static class RegionMover implements Runnable {
-
- final StreamChooser streamChooser;
- final StreamMover streamMover;
- final Optional<RateLimiter> rateLimiter;
- final CountDownLatch doneLatch;
- volatile boolean running = true;
-
- RegionMover(StreamChooser streamChooser,
- StreamMover streamMover,
- Optional<RateLimiter> rateLimiter,
- CountDownLatch doneLatch) {
- this.streamChooser = streamChooser;
- this.streamMover = streamMover;
- this.rateLimiter = rateLimiter;
- this.doneLatch = doneLatch;
- }
-
- @Override
- public void run() {
- while (running) {
- if (rateLimiter.isPresent()) {
- rateLimiter.get().acquire();
- }
-
- String stream = streamChooser.choose();
- if (null == stream) {
- break;
- }
-
- streamMover.moveStream(stream);
- }
- doneLatch.countDown();
- }
-
- void shutdown() {
- running = false;
- }
- }
-
- @Override
- public void close() {
- // no-op
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java
deleted file mode 100644
index 1d7b6f7..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamChooser.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-/**
- * Choose a stream to rebalance.
- */
-public interface StreamChooser {
- /**
- * Choose a stream to rebalance.
- *
- * @return stream chose
- */
- String choose();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java
deleted file mode 100644
index 4a04530..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMover.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-/**
- * A stream mover to move streams between proxies.
- */
-public interface StreamMover {
-
- /**
- * Move given stream <i>streamName</i>.
- *
- * @param streamName
- * stream name to move
- * @return <i>true</i> if successfully moved the stream, <i>false</i> when failure happens.
- * @throws Exception
- */
- boolean moveStream(final String streamName);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java
deleted file mode 100644
index 68d934b..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/StreamMoverImpl.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.balancer;
-
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.service.DistributedLogClient;
-import com.twitter.util.Await;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Move Streams from <i>src</i> to <i>target</i>.
- */
-public class StreamMoverImpl implements StreamMover {
-
- private static final Logger logger = LoggerFactory.getLogger(StreamMoverImpl.class);
-
- final String source, target;
- final DistributedLogClient srcClient, targetClient;
- final MonitorServiceClient srcMonitor, targetMonitor;
-
- public StreamMoverImpl(String source, DistributedLogClient srcClient, MonitorServiceClient srcMonitor,
- String target, DistributedLogClient targetClient, MonitorServiceClient targetMonitor) {
- this.source = source;
- this.srcClient = srcClient;
- this.srcMonitor = srcMonitor;
- this.target = target;
- this.targetClient = targetClient;
- this.targetMonitor = targetMonitor;
- }
-
- /**
- * Move given stream <i>streamName</i>.
- *
- * @param streamName
- * stream name to move
- * @return <i>true</i> if successfully moved the stream, <i>false</i> when failure happens.
- * @throws Exception
- */
- public boolean moveStream(final String streamName) {
- try {
- doMoveStream(streamName);
- return true;
- } catch (Exception e) {
- return false;
- }
- }
-
- private void doMoveStream(final String streamName) throws Exception {
- Await.result(srcClient.release(streamName).flatMap(new Function<Void, Future<Void>>() {
- @Override
- public Future<Void> apply(Void result) {
- return targetMonitor.check(streamName).addEventListener(new FutureEventListener<Void>() {
- @Override
- public void onSuccess(Void value) {
- logger.info("Moved stream {} from {} to {}.",
- new Object[]{streamName, source, target});
- }
-
- @Override
- public void onFailure(Throwable cause) {
- logger.info("Failed to move stream {} from region {} to {} : ",
- new Object[]{streamName, source, target, cause});
- }
- });
- }
- }));
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("StreamMover('").append(source).append("' -> '").append(target).append("')");
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/package-info.java
deleted file mode 100644
index 9eb8950..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Balancer to move streams around to balance the traffic.
- */
-package org.apache.distributedlog.service.balancer;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
deleted file mode 100644
index 7d72093..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.config;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.config.ConcurrentConstConfiguration;
-import org.apache.distributedlog.config.ConfigurationSubscription;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.config.FileConfigurationBuilder;
-import org.apache.distributedlog.config.PropertiesConfigurationBuilder;
-import java.io.File;
-import java.net.MalformedURLException;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * For all streams return the same dynamic config based on configFile.
- */
-public class DefaultStreamConfigProvider implements StreamConfigProvider {
-
- private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamConfigProvider.class);
-
- private final Optional<DynamicDistributedLogConfiguration> dynConf;
- private final ConfigurationSubscription confSub;
-
- public DefaultStreamConfigProvider(String configFilePath,
- ScheduledExecutorService executorService,
- int reloadPeriod,
- TimeUnit reloadUnit)
- throws ConfigurationException {
- try {
- File configFile = new File(configFilePath);
- FileConfigurationBuilder properties =
- new PropertiesConfigurationBuilder(configFile.toURI().toURL());
- ConcurrentConstConfiguration defaultConf =
- new ConcurrentConstConfiguration(new DistributedLogConfiguration());
- DynamicDistributedLogConfiguration conf =
- new DynamicDistributedLogConfiguration(defaultConf);
- List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(properties);
- confSub = new ConfigurationSubscription(
- conf, fileConfigBuilders, executorService, reloadPeriod, reloadUnit);
- this.dynConf = Optional.of(conf);
- } catch (MalformedURLException ex) {
- throw new ConfigurationException(ex);
- }
- }
-
- @Override
- public Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName) {
- return dynConf;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java
deleted file mode 100644
index 195f29d..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/NullStreamConfigProvider.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.config;
-
-import com.google.common.base.Optional;
-
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * For all streams return an absent configuration.
- */
-public class NullStreamConfigProvider implements StreamConfigProvider {
- static final Logger LOG = LoggerFactory.getLogger(NullStreamConfigProvider.class);
-
- private static final Optional<DynamicDistributedLogConfiguration> nullConf =
- Optional.<DynamicDistributedLogConfiguration>absent();
-
- @Override
- public Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName) {
- return nullConf;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
deleted file mode 100644
index 257b4be..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
+++ /dev/null
@@ -1,443 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.config;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogConstants;
-import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.CompositeConfiguration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.SystemConfiguration;
-
-/**
- * Configuration for DistributedLog Server.
- */
-public class ServerConfiguration extends CompositeConfiguration {
-
- private static ClassLoader defaultLoader;
-
- static {
- defaultLoader = Thread.currentThread().getContextClassLoader();
- if (null == defaultLoader) {
- defaultLoader = DistributedLogConfiguration.class.getClassLoader();
- }
- }
-
- // Server DLSN version
- protected static final String SERVER_DLSN_VERSION = "server_dlsn_version";
- protected static final byte SERVER_DLSN_VERSION_DEFAULT = DLSN.VERSION1;
-
- // Server Durable Write Enable/Disable Flag
- protected static final String SERVER_DURABLE_WRITE_ENABLED = "server_durable_write_enabled";
- protected static final boolean SERVER_DURABLE_WRITE_ENABLED_DEFAULT = true;
-
- // Server Region Id
- protected static final String SERVER_REGION_ID = "server_region_id";
- protected static final int SERVER_REGION_ID_DEFAULT = DistributedLogConstants.LOCAL_REGION_ID;
-
- // Server Port
- protected static final String SERVER_PORT = "server_port";
- protected static final int SERVER_PORT_DEFAULT = 0;
-
- // Server Shard Id
- protected static final String SERVER_SHARD_ID = "server_shard";
- protected static final int SERVER_SHARD_ID_DEFAULT = -1;
-
- // Server Threads
- protected static final String SERVER_NUM_THREADS = "server_threads";
- protected static final int SERVER_NUM_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors();
-
- // Server enable per stream stat
- protected static final String SERVER_ENABLE_PERSTREAM_STAT = "server_enable_perstream_stat";
- protected static final boolean SERVER_ENABLE_PERSTREAM_STAT_DEFAULT = true;
-
- // Server graceful shutdown period (in millis)
- protected static final String SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS = "server_graceful_shutdown_period_ms";
- protected static final long SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT = 0L;
-
- // Server service timeout
- public static final String SERVER_SERVICE_TIMEOUT_MS = "server_service_timeout_ms";
- public static final String SERVER_SERVICE_TIMEOUT_MS_OLD = "serviceTimeoutMs";
- public static final long SERVER_SERVICE_TIMEOUT_MS_DEFAULT = 0;
-
- // Server close writer timeout
- public static final String SERVER_WRITER_CLOSE_TIMEOUT_MS = "server_writer_close_timeout_ms";
- public static final long SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT = 1000;
-
- // Server stream probation timeout
- public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS = "server_stream_probation_timeout_ms";
- public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD = "streamProbationTimeoutMs";
- public static final long SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT = 60 * 1000 * 5;
-
- // Server stream to partition converter
- protected static final String SERVER_STREAM_PARTITION_CONVERTER_CLASS = "stream_partition_converter_class";
-
- // Use hostname as the allocator pool name
- protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME =
- "server_use_hostname_as_allocator_pool_name";
- protected static final boolean SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT = false;
- //Configure refresh interval for calculating resource placement in seconds
- public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S =
- "server_resource_placement_refresh_interval_sec";
- public static final int SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT = 120;
-
- public ServerConfiguration() {
- super();
- addConfiguration(new SystemConfiguration());
- }
-
- /**
- * Load configurations from {@link DistributedLogConfiguration}.
- *
- * @param dlConf
- * distributedlog configuration
- */
- public void loadConf(DistributedLogConfiguration dlConf) {
- addConfiguration(dlConf);
- }
-
- /**
- * Set the version to encode dlsn.
- *
- * @param version
- * dlsn version
- * @return server configuration
- */
- public ServerConfiguration setDlsnVersion(byte version) {
- setProperty(SERVER_DLSN_VERSION, version);
- return this;
- }
-
- /**
- * Get the version to encode dlsn.
- *
- * @see DLSN
- * @return version to encode dlsn.
- */
- public byte getDlsnVersion() {
- return getByte(SERVER_DLSN_VERSION, SERVER_DLSN_VERSION_DEFAULT);
- }
-
- /**
- * Set the flag to enable/disable durable write.
- *
- * @param enabled
- * flag to enable/disable durable write
- * @return server configuration
- */
- public ServerConfiguration enableDurableWrite(boolean enabled) {
- setProperty(SERVER_DURABLE_WRITE_ENABLED, enabled);
- return this;
- }
-
- /**
- * Is durable write enabled.
- *
- * @return true if waiting writes to be durable. otherwise false.
- */
- public boolean isDurableWriteEnabled() {
- return getBoolean(SERVER_DURABLE_WRITE_ENABLED, SERVER_DURABLE_WRITE_ENABLED_DEFAULT);
- }
-
- /**
- * Set the region id used to instantiate DistributedLogNamespace.
- *
- * @param regionId
- * region id
- * @return server configuration
- */
- public ServerConfiguration setRegionId(int regionId) {
- setProperty(SERVER_REGION_ID, regionId);
- return this;
- }
-
- /**
- * Get the region id used to instantiate {@link org.apache.distributedlog.namespace.DistributedLogNamespace}.
- *
- * @return region id used to instantiate DistributedLogNamespace
- */
- public int getRegionId() {
- return getInt(SERVER_REGION_ID, SERVER_REGION_ID_DEFAULT);
- }
-
- /**
- * Set the server port running for this service.
- *
- * @param port
- * server port
- * @return server configuration
- */
- public ServerConfiguration setServerPort(int port) {
- setProperty(SERVER_PORT, port);
- return this;
- }
-
- /**
- * Get the server port running for this service.
- *
- * @return server port
- */
- public int getServerPort() {
- return getInt(SERVER_PORT, SERVER_PORT_DEFAULT);
- }
-
- /**
- * Set the shard id of this server.
- *
- * @param shardId
- * shard id
- * @return shard id of this server
- */
- public ServerConfiguration setServerShardId(int shardId) {
- setProperty(SERVER_SHARD_ID, shardId);
- return this;
- }
-
- /**
- * Get the shard id of this server.
- *
- * <p>It would be used to instantiate the client id used for DistributedLogNamespace.
- *
- * @return shard id of this server.
- */
- public int getServerShardId() {
- return getInt(SERVER_SHARD_ID, SERVER_SHARD_ID_DEFAULT);
- }
-
- /**
- * Get the number of threads for the executor of this server.
- *
- * @return number of threads for the executor running in this server.
- */
- public int getServerThreads() {
- return getInt(SERVER_NUM_THREADS, SERVER_NUM_THREADS_DEFAULT);
- }
-
- /**
- * Set the number of threads for the executor of this server.
- *
- * @param numThreads
- * number of threads for the executor running in this server.
- * @return server configuration
- */
- public ServerConfiguration setServerThreads(int numThreads) {
- setProperty(SERVER_NUM_THREADS, numThreads);
- return this;
- }
-
- /**
- * Enable/Disable per stream stat.
- *
- * @param enabled
- * flag to enable/disable per stream stat
- * @return server configuration
- */
- public ServerConfiguration setPerStreamStatEnabled(boolean enabled) {
- setProperty(SERVER_ENABLE_PERSTREAM_STAT, enabled);
- return this;
- }
-
- /**
- * Whether the per stream stat enabled for not in this server.
- *
- * @return true if per stream stat enable, otherwise false.
- */
- public boolean isPerStreamStatEnabled() {
- return getBoolean(SERVER_ENABLE_PERSTREAM_STAT, SERVER_ENABLE_PERSTREAM_STAT_DEFAULT);
- }
-
- /**
- * Set the graceful shutdown period in millis.
- *
- * @param periodMs
- * graceful shutdown period in millis.
- * @return server configuration
- */
- public ServerConfiguration setGracefulShutdownPeriodMs(long periodMs) {
- setProperty(SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS, periodMs);
- return this;
- }
-
- /**
- * Get the graceful shutdown period in millis.
- *
- * @return graceful shutdown period in millis.
- */
- public long getGracefulShutdownPeriodMs() {
- return getLong(SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS, SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT);
- }
-
- /**
- * Get timeout for stream op execution in proxy layer.
- *
- * <p>0 disables timeout.
- *
- * @return timeout for stream operation in proxy layer.
- */
- public long getServiceTimeoutMs() {
- return getLong(SERVER_SERVICE_TIMEOUT_MS,
- getLong(SERVER_SERVICE_TIMEOUT_MS_OLD, SERVER_SERVICE_TIMEOUT_MS_DEFAULT));
- }
-
- /**
- * Set timeout for stream op execution in proxy layer.
- *
- * <p>0 disables timeout.
- *
- * @param timeoutMs
- * timeout for stream operation in proxy layer.
- * @return dl configuration.
- */
- public ServerConfiguration setServiceTimeoutMs(long timeoutMs) {
- setProperty(SERVER_SERVICE_TIMEOUT_MS, timeoutMs);
- return this;
- }
-
- /**
- * Get timeout for closing writer in proxy layer.
- *
- * <p>0 disables timeout.
- *
- * @return timeout for closing writer in proxy layer.
- */
- public long getWriterCloseTimeoutMs() {
- return getLong(SERVER_WRITER_CLOSE_TIMEOUT_MS, SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT);
- }
-
- /**
- * Set timeout for closing writer in proxy layer.
- *
- * <p>0 disables timeout.
- *
- * @param timeoutMs
- * timeout for closing writer in proxy layer.
- * @return dl configuration.
- */
- public ServerConfiguration setWriterCloseTimeoutMs(long timeoutMs) {
- setProperty(SERVER_WRITER_CLOSE_TIMEOUT_MS, timeoutMs);
- return this;
- }
-
- /**
- * How long should stream be kept in cache in probationary state after service timeout.
- *
- * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds.
- *
- * @return stream probation timeout in ms.
- */
- public long getStreamProbationTimeoutMs() {
- return getLong(SERVER_STREAM_PROBATION_TIMEOUT_MS,
- getLong(SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD, SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT));
- }
-
- /**
- * How long should stream be kept in cache in probationary state after service timeout.
- *
- * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds.
- *
- * @param timeoutMs probation timeout in ms.
- * @return server configuration
- */
- public ServerConfiguration setStreamProbationTimeoutMs(long timeoutMs) {
- setProperty(SERVER_STREAM_PROBATION_TIMEOUT_MS, timeoutMs);
- return this;
- }
-
- /**
- * Set the stream partition converter class.
- *
- * @param converterClass
- * stream partition converter class
- * @return server configuration
- */
- public ServerConfiguration setStreamPartitionConverterClass(
- Class<? extends StreamPartitionConverter> converterClass) {
- setProperty(SERVER_STREAM_PARTITION_CONVERTER_CLASS, converterClass.getName());
- return this;
- }
-
- /**
- * Get the stream partition converter class.
- *
- * @return the stream partition converter class.
- * @throws ConfigurationException
- */
- public Class<? extends StreamPartitionConverter> getStreamPartitionConverterClass()
- throws ConfigurationException {
- return ReflectionUtils.getClass(
- this,
- SERVER_STREAM_PARTITION_CONVERTER_CLASS,
- IdentityStreamPartitionConverter.class,
- StreamPartitionConverter.class,
- defaultLoader);
- }
-
- /**
- * Set if use hostname as the allocator pool name.
- *
- * @param useHostname whether to use hostname as the allocator pool name.
- * @return server configuration
- * @see #isUseHostnameAsAllocatorPoolName()
- */
- public ServerConfiguration setUseHostnameAsAllocatorPoolName(boolean useHostname) {
- setProperty(SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME, useHostname);
- return this;
- }
-
- /**
- * Get if use hostname as the allocator pool name.
- *
- * @return true if use hostname as the allocator pool name. otherwise, use
- * {@link #getServerShardId()} as the allocator pool name.
- * @see #getServerShardId()
- */
- public boolean isUseHostnameAsAllocatorPoolName() {
- return getBoolean(SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME,
- SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT);
- }
-
- public ServerConfiguration setResourcePlacementRefreshInterval(int refreshIntervalSecs) {
- setProperty(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, refreshIntervalSecs);
- return this;
- }
-
- public int getResourcePlacementRefreshInterval() {
- return getInt(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT);
- }
-
- /**
- * Validate the configuration.
- *
- * @throws IllegalStateException when there are any invalid settings.
- */
- public void validate() {
- byte dlsnVersion = getDlsnVersion();
- checkArgument(dlsnVersion >= DLSN.VERSION0 && dlsnVersion <= DLSN.VERSION1,
- "Unknown dlsn version " + dlsnVersion);
- checkArgument(getServerThreads() > 0,
- "Invalid number of server threads : " + getServerThreads());
- checkArgument(getServerShardId() >= 0,
- "Invalid server shard id : " + getServerShardId());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java
deleted file mode 100644
index 29052f9..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/ServiceStreamConfigProvider.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.config;
-
-import com.google.common.base.Optional;
-import org.apache.distributedlog.config.DynamicConfigurationFactory;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
-import java.io.File;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provide per stream configuration to DistributedLog service layer.
- */
-public class ServiceStreamConfigProvider implements StreamConfigProvider {
-
- private static final Logger LOG = LoggerFactory.getLogger(ServiceStreamConfigProvider.class);
-
- private static final String CONFIG_EXTENSION = "conf";
-
- private final File configBaseDir;
- private final File defaultConfigFile;
- private final StreamPartitionConverter partitionConverter;
- private final DynamicConfigurationFactory configFactory;
- private final DynamicDistributedLogConfiguration defaultDynConf;
-
- public ServiceStreamConfigProvider(String configPath,
- String defaultConfigPath,
- StreamPartitionConverter partitionConverter,
- ScheduledExecutorService executorService,
- int reloadPeriod,
- TimeUnit reloadUnit)
- throws ConfigurationException {
- this.configBaseDir = new File(configPath);
- if (!configBaseDir.exists()) {
- throw new ConfigurationException("Stream configuration base directory "
- + configPath + " does not exist");
- }
- this.defaultConfigFile = new File(configPath);
- if (!defaultConfigFile.exists()) {
- throw new ConfigurationException("Stream configuration default config "
- + defaultConfigPath + " does not exist");
- }
-
- // Construct reloading default configuration
- this.partitionConverter = partitionConverter;
- this.configFactory = new DynamicConfigurationFactory(executorService, reloadPeriod, reloadUnit);
- // We know it exists from the check above.
- this.defaultDynConf = configFactory.getDynamicConfiguration(defaultConfigPath).get();
- }
-
- @Override
- public Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName) {
- String configName = partitionConverter.convert(streamName).getStream();
- String configPath = getConfigPath(configName);
- Optional<DynamicDistributedLogConfiguration> dynConf = Optional.<DynamicDistributedLogConfiguration>absent();
- try {
- dynConf = configFactory.getDynamicConfiguration(configPath, defaultDynConf);
- } catch (ConfigurationException ex) {
- LOG.warn("Configuration exception for stream {} ({}) at {}",
- new Object[] {streamName, configName, configPath, ex});
- }
- return dynConf;
- }
-
- private String getConfigPath(String configName) {
- return new File(configBaseDir, String.format("%s.%s", configName, CONFIG_EXTENSION)).getPath();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java
deleted file mode 100644
index c704f70..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/StreamConfigProvider.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.config;
-
-import com.google.common.base.Optional;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-
-/**
- * Expose per-stream configs to dl proxy.
- */
-public interface StreamConfigProvider {
- /**
- * Get dynamic per stream config overrides for a given stream.
- *
- * @param streamName stream name to return config for
- * @return Optional dynamic configuration instance
- */
- Optional<DynamicDistributedLogConfiguration> getDynamicStreamConfig(String streamName);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/package-info.java
deleted file mode 100644
index b07605e..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/config/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * DistributedLog Server Configurations.
- */
-package org.apache.distributedlog.service.config;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/package-info.java
deleted file mode 100644
index 3fcfeda..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * DistributedLog Proxy Service.
- */
-package org.apache.distributedlog.service;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java
deleted file mode 100644
index fa3dd49..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/EqualLoadAppraiser.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import com.twitter.util.Future;
-
-/**
- * Equal Load Appraiser.
- *
- * <p>Created for those who hold these truths to be self-evident, that all streams are created equal,
- * that they are endowed by their creator with certain unalienable loads, that among these are
- * Uno, Eins, and One.
- */
-public class EqualLoadAppraiser implements LoadAppraiser {
- @Override
- public Future<StreamLoad> getStreamLoad(String stream) {
- return Future.value(new StreamLoad(stream, 1));
- }
-
- @Override
- public Future<Void> refreshCache() {
- return Future.value(null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
deleted file mode 100644
index 2e9dd6b..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.util.Duration;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.Futures;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
-
-/**
- * Least Load Placement Policy.
- *
- * <p>A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as
- * evenly as possible across all shards. The LoadAppraiser remains responsible for determining what
- * the load of a server would be. This placement policy then distributes these streams across the
- * servers.
- */
-public class LeastLoadPlacementPolicy extends PlacementPolicy {
-
- private static final Logger logger = LoggerFactory.getLogger(LeastLoadPlacementPolicy.class);
-
- private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
- private Map<String, String> streamToServer = new HashMap<String, String>();
-
- public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
- DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
- Duration refreshInterval, StatsLogger statsLogger) {
- super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger);
- statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- if (serverLoads.size() > 0) {
- return serverLoads.last().getLoad() - serverLoads.first().getLoad();
- } else {
- return getDefaultValue();
- }
- }
- });
- }
-
- private synchronized String getStreamOwner(String stream) {
- return streamToServer.get(stream);
- }
-
- @Override
- public Future<String> placeStream(String stream) {
- String streamOwner = getStreamOwner(stream);
- if (null != streamOwner) {
- return Future.value(streamOwner);
- }
- Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream);
- return streamLoadFuture.map(new Function<StreamLoad, String>() {
- @Override
- public String apply(StreamLoad streamLoad) {
- return placeStreamSynchronized(streamLoad);
- }
- });
- }
-
- private synchronized String placeStreamSynchronized(StreamLoad streamLoad) {
- ServerLoad serverLoad = serverLoads.pollFirst();
- serverLoad.addStream(streamLoad);
- serverLoads.add(serverLoad);
- return serverLoad.getServer();
- }
-
- @Override
- public void refresh() {
- logger.info("Refreshing server loads.");
- Future<Void> refresh = loadAppraiser.refreshCache();
- final Set<String> servers = getServers();
- final Set<String> allStreams = getStreams();
- Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap(
- new Function<Void, Future<TreeSet<ServerLoad>>>() {
- @Override
- public Future<TreeSet<ServerLoad>> apply(Void v1) {
- return calculate(servers, allStreams);
- }
- });
- serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
- @Override
- public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
- try {
- updateServerLoads(serverLoads);
- } catch (PlacementStateManager.StateManagerSaveException e) {
- logger.error("The refreshed mapping could not be persisted and will not be used.", e);
- }
- return BoxedUnit.UNIT;
- }
- });
- }
-
- private synchronized void updateServerLoads(TreeSet<ServerLoad> serverLoads)
- throws PlacementStateManager.StateManagerSaveException {
- this.placementStateManager.saveOwnership(serverLoads);
- this.streamToServer = serverLoadsToMap(serverLoads);
- this.serverLoads = serverLoads;
- }
-
- @Override
- public synchronized void load(TreeSet<ServerLoad> serverLoads) {
- this.serverLoads = serverLoads;
- this.streamToServer = serverLoadsToMap(serverLoads);
- }
-
- public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) {
- logger.info("Calculating server loads");
- final long startTime = System.currentTimeMillis();
- ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size());
-
- for (String stream : streams) {
- Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream);
- futures.add(streamLoad);
- }
-
- return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() {
- @Override
- public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) {
- /* Sort streamLoads so largest streams are placed first for better balance */
- TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>();
- for (StreamLoad streamLoad : streamLoads) {
- streamQueue.add(streamLoad);
- }
-
- TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
- for (String server : servers) {
- ServerLoad serverLoad = new ServerLoad(server);
- if (!streamQueue.isEmpty()) {
- serverLoad.addStream(streamQueue.pollFirst());
- }
- serverLoads.add(serverLoad);
- }
-
- while (!streamQueue.isEmpty()) {
- ServerLoad serverLoad = serverLoads.pollFirst();
- serverLoad.addStream(streamQueue.pollFirst());
- serverLoads.add(serverLoad);
- }
- return serverLoads;
- }
- }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
- @Override
- public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
- placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime);
- return BoxedUnit.UNIT;
- }
- }).onFailure(new Function<Throwable, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Throwable t) {
- logger.error("Failure calculating loads", t);
- placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime);
- return BoxedUnit.UNIT;
- }
- });
- }
-
- private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) {
- HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size());
- for (ServerLoad serverLoad : serverLoads) {
- for (StreamLoad streamLoad : serverLoad.getStreamLoads()) {
- streamToServer.put(streamLoad.getStream(), serverLoad.getServer());
- }
- }
- return streamToServer;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java
deleted file mode 100644
index 5cd8980..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/LoadAppraiser.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import com.twitter.util.Future;
-
-/**
- * Interface for load appraiser.
- */
-public interface LoadAppraiser {
- /**
- * Retrieve the stream load for a given {@code stream}.
- *
- * @param stream name of the stream
- * @return the stream load of the stream.
- */
- Future<StreamLoad> getStreamLoad(String stream);
-
- /**
- * Refesch the cache.
- * @return
- */
- Future<Void> refreshCache();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
deleted file mode 100644
index ac952aa..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.service.DLSocketAddress;
-import com.twitter.util.Duration;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.ScheduledThreadPoolTimer;
-import com.twitter.util.Time;
-import com.twitter.util.Timer;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.TreeSet;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
-
-/**
- * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream contains.
- *
- * <p>The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will
- * then distributed these StreamLoads to the available servers in a manner defined by the
- * implementation creating ServerLoad objects. It then saves this assignment via the
- * PlacementStateManager.
- */
-public abstract class PlacementPolicy {
-
- private static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class);
-
- protected final LoadAppraiser loadAppraiser;
- protected final RoutingService routingService;
- protected final DistributedLogNamespace namespace;
- protected final PlacementStateManager placementStateManager;
- private final Duration refreshInterval;
- protected final OpStatsLogger placementCalcStats;
- private Timer placementRefreshTimer;
-
- public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
- DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
- Duration refreshInterval, StatsLogger statsLogger) {
- this.loadAppraiser = loadAppraiser;
- this.routingService = routingService;
- this.namespace = namespace;
- this.placementStateManager = placementStateManager;
- this.refreshInterval = refreshInterval;
- placementCalcStats = statsLogger.getOpStatsLogger("placement");
- }
-
- public Set<String> getServers() {
- Set<SocketAddress> hosts = routingService.getHosts();
- Set<String> servers = new HashSet<String>(hosts.size());
- for (SocketAddress address : hosts) {
- servers.add(DLSocketAddress.toString((InetSocketAddress) address));
- }
- return servers;
- }
-
- public Set<String> getStreams() {
- Set<String> streams = new HashSet<String>();
- try {
- Iterator<String> logs = namespace.getLogs();
- while (logs.hasNext()) {
- streams.add(logs.next());
- }
- } catch (IOException e) {
- logger.error("Could not get streams for placement policy.", e);
- }
- return streams;
- }
-
- public void start(boolean leader) {
- logger.info("Starting placement policy");
-
- TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>();
- for (String server : getServers()) {
- emptyServerLoads.add(new ServerLoad(server));
- }
- load(emptyServerLoads); //Pre-Load so streams don't NPE
- if (leader) { //this is the leader shard
- logger.info("Shard is leader. Scheduling timed refresh.");
- placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true);
- placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() {
- @Override
- public BoxedUnit apply() {
- refresh();
- return BoxedUnit.UNIT;
- }
- });
- } else {
- logger.info("Shard is not leader. Watching for server load changes.");
- placementStateManager.watch(new PlacementStateManager.PlacementCallback() {
- @Override
- public void callback(TreeSet<ServerLoad> serverLoads) {
- if (!serverLoads.isEmpty()) {
- load(serverLoads);
- }
- }
- });
- }
- }
-
- public void close() {
- if (placementRefreshTimer != null) {
- placementRefreshTimer.stop();
- }
- }
-
- /**
- * Places the stream on a server according to the policy.
- *
- * <p>It returns a future containing the host that owns the stream upon completion
- */
- public abstract Future<String> placeStream(String stream);
-
- /**
- * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager.
- */
- public abstract void refresh();
-
- /**
- * Loads the placement mapping into the node from a TreeSet of ServerLoads.
- */
- public abstract void load(TreeSet<ServerLoad> serverLoads);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java
deleted file mode 100644
index 0187bed..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/PlacementStateManager.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import java.util.TreeSet;
-
-/**
- * The PlacementStateManager handles persistence of calculated resource placements.
- */
-public interface PlacementStateManager {
-
- /**
- * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage.
- */
- void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException;
-
- /**
- * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage.
- */
- TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException;
-
- /**
- * Watch the persistent storage for changes to the ownership mapping.
- *
- * <p>The placementCallback callbacks will be triggered with the new mapping when a change occurs.
- */
- void watch(PlacementCallback placementCallback);
-
- /**
- * Placement Callback.
- *
- * <p>The callback is triggered when server loads are updated.
- */
- interface PlacementCallback {
- void callback(TreeSet<ServerLoad> serverLoads);
- }
-
- /**
- * The base exception thrown when state manager encounters errors.
- */
- abstract class StateManagerException extends Exception {
- public StateManagerException(String message, Exception e) {
- super(message, e);
- }
- }
-
- /**
- * Exception thrown when failed to load the ownership mapping.
- */
- class StateManagerLoadException extends StateManagerException {
- public StateManagerLoadException(Exception e) {
- super("Load of Ownership failed", e);
- }
- }
-
- /**
- * Exception thrown when failed to save the ownership mapping.
- */
- class StateManagerSaveException extends StateManagerException {
- public StateManagerSaveException(Exception e) {
- super("Save of Ownership failed", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java
deleted file mode 100644
index d65c401..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/ServerLoad.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.apache.thrift.transport.TMemoryBuffer;
-import org.apache.thrift.transport.TMemoryInputTransport;
-
-/**
- * An object represents the server load.
- *
- * <p>A comparable data object containing the identifier of the server, total appraised load on the
- * server, and all streams assigned to the server by the resource placement mapping. This is
- * comparable first by load and then by server so that a sorted data structure of these will be
- * consistent across multiple calculations.
- */
-public class ServerLoad implements Comparable {
- private static final int BUFFER_SIZE = 4096000;
- private final String server;
- private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>();
- private long load = 0L;
-
- public ServerLoad(String server) {
- this.server = server;
- }
-
- public synchronized long addStream(StreamLoad stream) {
- this.load += stream.getLoad();
- streamLoads.add(stream);
- return this.load;
- }
-
- public synchronized long removeStream(String stream) {
- for (StreamLoad streamLoad : streamLoads) {
- if (streamLoad.stream.equals(stream)) {
- this.load -= streamLoad.getLoad();
- streamLoads.remove(streamLoad);
- return this.load;
- }
- }
- return this.load; //Throwing an exception wouldn't help us as our logic should never reach here
- }
-
- public synchronized long getLoad() {
- return load;
- }
-
- public synchronized Set<StreamLoad> getStreamLoads() {
- return streamLoads;
- }
-
- public synchronized String getServer() {
- return server;
- }
-
- protected synchronized org.apache.distributedlog.service.placement.thrift.ServerLoad toThrift() {
- org.apache.distributedlog.service.placement.thrift.ServerLoad tServerLoad =
- new org.apache.distributedlog.service.placement.thrift.ServerLoad();
- tServerLoad.setServer(server);
- tServerLoad.setLoad(load);
- ArrayList<org.apache.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads =
- new ArrayList<org.apache.distributedlog.service.placement.thrift.StreamLoad>();
- for (StreamLoad streamLoad : streamLoads) {
- tStreamLoads.add(streamLoad.toThrift());
- }
- tServerLoad.setStreams(tStreamLoads);
- return tServerLoad;
- }
-
- public byte[] serialize() throws IOException {
- TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- toThrift().write(protocol);
- transport.flush();
- return transport.toString(UTF_8.name()).getBytes(UTF_8);
- } catch (TException e) {
- throw new IOException("Failed to serialize server load : ", e);
- } catch (UnsupportedEncodingException uee) {
- throw new IOException("Failed to serialize server load : ", uee);
- }
- }
-
- public static ServerLoad deserialize(byte[] data) throws IOException {
- org.apache.distributedlog.service.placement.thrift.ServerLoad tServerLoad =
- new org.apache.distributedlog.service.placement.thrift.ServerLoad();
- TMemoryInputTransport transport = new TMemoryInputTransport(data);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- tServerLoad.read(protocol);
- ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer());
- if (tServerLoad.isSetStreams()) {
- for (org.apache.distributedlog.service.placement.thrift.StreamLoad tStreamLoad :
- tServerLoad.getStreams()) {
- serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()));
- }
- }
- return serverLoad;
- } catch (TException e) {
- throw new IOException("Failed to deserialize server load : ", e);
- }
- }
-
- @Override
- public synchronized int compareTo(Object o) {
- ServerLoad other = (ServerLoad) o;
- if (load == other.getLoad()) {
- return server.compareTo(other.getServer());
- } else {
- return Long.compare(load, other.getLoad());
- }
- }
-
- @Override
- public synchronized boolean equals(Object o) {
- if (!(o instanceof ServerLoad)) {
- return false;
- }
- ServerLoad other = (ServerLoad) o;
- return server.equals(other.getServer())
- && load == other.getLoad()
- && streamLoads.equals(other.getStreamLoads());
- }
-
- @Override
- public synchronized String toString() {
- return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads);
- }
-
- @Override
- public synchronized int hashCode() {
- return new HashCodeBuilder().append(server).append(load).append(streamLoads).build();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java
deleted file mode 100644
index f271222..0000000
--- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/placement/StreamLoad.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service.placement;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TJSONProtocol;
-import org.apache.thrift.transport.TMemoryBuffer;
-import org.apache.thrift.transport.TMemoryInputTransport;
-
-/**
- * An object represent the load of a stream.
- *
- * <p>A comparable data object containing the identifier of the stream and the appraised load produced
- * by the stream.
- */
-public class StreamLoad implements Comparable {
- private static final int BUFFER_SIZE = 4096;
- public final String stream;
- private final int load;
-
- public StreamLoad(String stream, int load) {
- this.stream = stream;
- this.load = load;
- }
-
- public int getLoad() {
- return load;
- }
-
- public String getStream() {
- return stream;
- }
-
- protected org.apache.distributedlog.service.placement.thrift.StreamLoad toThrift() {
- org.apache.distributedlog.service.placement.thrift.StreamLoad tStreamLoad =
- new org.apache.distributedlog.service.placement.thrift.StreamLoad();
- return tStreamLoad.setStream(stream).setLoad(load);
- }
-
- public byte[] serialize() throws IOException {
- TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- toThrift().write(protocol);
- transport.flush();
- return transport.toString(UTF_8.name()).getBytes(UTF_8);
- } catch (TException e) {
- throw new IOException("Failed to serialize stream load : ", e);
- } catch (UnsupportedEncodingException uee) {
- throw new IOException("Failed to serialize stream load : ", uee);
- }
- }
-
- public static StreamLoad deserialize(byte[] data) throws IOException {
- org.apache.distributedlog.service.placement.thrift.StreamLoad tStreamLoad =
- new org.apache.distributedlog.service.placement.thrift.StreamLoad();
- TMemoryInputTransport transport = new TMemoryInputTransport(data);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- tStreamLoad.read(protocol);
- return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad());
- } catch (TException e) {
- throw new IOException("Failed to deserialize stream load : ", e);
- }
- }
-
- @Override
- public int compareTo(Object o) {
- StreamLoad other = (StreamLoad) o;
- if (load == other.getLoad()) {
- return stream.compareTo(other.getStream());
- } else {
- return Long.compare(load, other.getLoad());
- }
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof StreamLoad)) {
- return false;
- }
- StreamLoad other = (StreamLoad) o;
- return stream.equals(other.getStream()) && load == other.getLoad();
- }
-
- @Override
- public String toString() {
- return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load);
- }
-
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(stream).append(load).build();
- }
-}