You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:19 UTC
[09/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubInfo.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubInfo.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubInfo.java
deleted file mode 100644
index 9a4cb3d..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubInfo.java
+++ /dev/null
@@ -1,162 +0,0 @@
-package org.apache.hedwig.server.topics;
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-
-import org.apache.hedwig.protocol.PubSubProtocol.HubInfoData;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.TextFormat;
-
-/**
- * Info identifies a hub server.
- */
-public class HubInfo {
-
- public static class InvalidHubInfoException extends Exception {
- public InvalidHubInfoException(String msg) {
- super(msg);
- }
-
- public InvalidHubInfoException(String msg, Throwable t) {
- super(msg, t);
- }
- }
-
- // address identify a hub server
- final HedwigSocketAddress addr;
- // its znode czxid
- final long czxid;
- // protobuf encoded hub info data to be serialized
- HubInfoData hubInfoData;
-
- public HubInfo(HedwigSocketAddress addr, long czxid) {
- this(addr, czxid, null);
- }
-
- protected HubInfo(HedwigSocketAddress addr, long czxid,
- HubInfoData data) {
- this.addr = addr;
- this.czxid = czxid;
- this.hubInfoData = data;
- }
-
- public HedwigSocketAddress getAddress() {
- return addr;
- }
-
- public long getZxid() {
- return czxid;
- }
-
- private synchronized HubInfoData getHubInfoData() {
- if (null == hubInfoData) {
- hubInfoData = HubInfoData.newBuilder().setHostname(addr.toString())
- .setCzxid(czxid).build();
- }
- return hubInfoData;
- }
-
- @Override
- public String toString() {
- return TextFormat.printToString(getHubInfoData());
- }
-
- @Override
- public boolean equals(Object o) {
- if (null == o) {
- return false;
- }
- if (!(o instanceof HubInfo)) {
- return false;
- }
- HubInfo other = (HubInfo)o;
- if (null == addr) {
- if (null == other.addr) {
- return true;
- } else {
- return czxid == other.czxid;
- }
- } else {
- if (addr.equals(other.addr)) {
- return czxid == other.czxid;
- } else {
- return false;
- }
- }
- }
-
- @Override
- public int hashCode() {
- return addr.hashCode();
- }
-
- /**
- * Parse hub info from a string.
- *
- * @param hubInfoStr
- * String representation of hub info
- * @return hub info
- * @throws InvalidHubInfoException when <code>hubInfoStr</code> is not a valid
- * string representation of hub info.
- */
- public static HubInfo parse(String hubInfoStr) throws InvalidHubInfoException {
- // it is not protobuf encoded hub info, it might be generated by ZkTopicManager
- if (!hubInfoStr.startsWith("hostname")) {
- final HedwigSocketAddress owner;
- try {
- owner = new HedwigSocketAddress(hubInfoStr);
- } catch (Exception e) {
- throw new InvalidHubInfoException("Corrupted hub server address : " + hubInfoStr, e);
- }
- return new HubInfo(owner, 0L);
- }
-
- // it is a protobuf encoded hub info.
- HubInfoData hubInfoData;
-
- try {
- BufferedReader reader = new BufferedReader(
- new StringReader(hubInfoStr));
- HubInfoData.Builder dataBuilder = HubInfoData.newBuilder();
- TextFormat.merge(reader, dataBuilder);
- hubInfoData = dataBuilder.build();
- } catch (InvalidProtocolBufferException ipbe) {
- throw new InvalidHubInfoException("Corrupted hub info : " + hubInfoStr, ipbe);
- } catch (IOException ie) {
- throw new InvalidHubInfoException("Corrupted hub info : " + hubInfoStr, ie);
- }
-
- final HedwigSocketAddress owner;
- try {
- owner = new HedwigSocketAddress(hubInfoData.getHostname().trim());
- } catch (Exception e) {
- throw new InvalidHubInfoException("Corrupted hub server address : " + hubInfoData.getHostname(), e);
- }
- long ownerZxid = hubInfoData.getCzxid();
- return new HubInfo(owner, ownerZxid, hubInfoData);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java
deleted file mode 100644
index 2f76020..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubLoad.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.hedwig.server.topics;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-
-import org.apache.hedwig.protocol.PubSubProtocol.HubLoadData;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.TextFormat;
-
-/**
- * This class encapsulates metrics for determining the load on a hub server.
- */
-public class HubLoad implements Comparable<HubLoad> {
-
- public static final HubLoad MAX_LOAD = new HubLoad(Long.MAX_VALUE);
- public static final HubLoad MIN_LOAD = new HubLoad(0);
-
- public static class InvalidHubLoadException extends Exception {
- private static final long serialVersionUID = 5870487176956413387L;
-
- public InvalidHubLoadException(String msg) {
- super(msg);
- }
-
- public InvalidHubLoadException(String msg, Throwable t) {
- super(msg, t);
- }
- }
-
- // how many topics that a hub server serves
- long numTopics;
-
- public HubLoad(long num) {
- this.numTopics = num;
- }
-
- public HubLoad(HubLoadData data) {
- this.numTopics = data.getNumTopics();
- }
-
- // TODO: Make this threadsafe (BOOKKEEPER-379)
- public HubLoad setNumTopics(long numTopics) {
- this.numTopics = numTopics;
- return this;
- }
-
- public long getNumTopics() {
- return this.numTopics;
- }
-
- public HubLoadData toHubLoadData() {
- return HubLoadData.newBuilder().setNumTopics(numTopics).build();
- }
-
- @Override
- public String toString() {
- return TextFormat.printToString(toHubLoadData());
- }
-
- @Override
- public boolean equals(Object o) {
- if (null == o ||
- !(o instanceof HubLoad)) {
- return false;
- }
- return 0 == compareTo((HubLoad)o);
- }
-
- @Override
- public int compareTo(HubLoad other) {
- return numTopics > other.numTopics ?
- 1 : (numTopics < other.numTopics ? -1 : 0);
- }
-
- @Override
- public int hashCode() {
- return (int)numTopics;
- }
-
- /**
- * Parse hub load from a string.
- *
- * @param hubLoadStr
- * String representation of hub load
- * @return hub load
- * @throws InvalidHubLoadException when <code>hubLoadStr</code> is not a valid
- * string representation of hub load.
- */
- public static HubLoad parse(String hubLoadStr) throws InvalidHubLoadException {
- // it is no protobuf encoded hub info, it might be generated by ZkTopicManager
- if (!hubLoadStr.startsWith("numTopics")) {
- try {
- long numTopics = Long.parseLong(hubLoadStr, 10);
- return new HubLoad(numTopics);
- } catch (NumberFormatException nfe) {
- throw new InvalidHubLoadException("Corrupted hub load data : " + hubLoadStr, nfe);
- }
- }
- // it it a protobuf encoded hub load data.
- HubLoadData hubLoadData;
- try {
- BufferedReader reader = new BufferedReader(
- new StringReader(hubLoadStr));
- HubLoadData.Builder dataBuilder = HubLoadData.newBuilder();
- TextFormat.merge(reader, dataBuilder);
- hubLoadData = dataBuilder.build();
- } catch (InvalidProtocolBufferException ipbe) {
- throw new InvalidHubLoadException("Corrupted hub load data : " + hubLoadStr, ipbe);
- } catch (IOException ie) {
- throw new InvalidHubLoadException("Corrupted hub load data : " + hubLoadStr, ie);
- }
-
- return new HubLoad(hubLoadData);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java
deleted file mode 100644
index 12524c9..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/HubServerManager.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.io.IOException;
-
-import org.apache.hedwig.util.Callback;
-
-/**
- * The HubServerManager class manages info about hub servers.
- */
-interface HubServerManager {
-
- static interface ManagerListener {
-
- /**
- * Server manager is suspended if encountering some transient errors.
- * {@link #onResume()} would be called if those errors could be fixed.
- * {@link #onShutdown()} would be called if those errors could not be fixed.
- */
- public void onSuspend();
-
- /**
- * Server manager is resumed after fixing some transient errors.
- */
- public void onResume();
-
- /**
- * Server manager had to shutdown due to unrecoverable errors.
- */
- public void onShutdown();
- }
-
- /**
- * Register a listener to listen events of server manager
- *
- * @param listener
- * Server Manager Listener
- */
- public void registerListener(ManagerListener listener);
-
- /**
- * Register itself to the cluster.
- *
- * @param selfLoad
- * Self load data
- * @param callback
- * Callback when itself registered.
- * @param ctx
- * Callback context.
- */
- public void registerSelf(HubLoad selfLoad, Callback<HubInfo> callback, Object ctx);
-
- /**
- * Unregister itself from the cluster.
- */
- public void unregisterSelf() throws IOException;
-
- /**
- * Uploading self server load data.
- *
- * It is an asynchrounous call which should not block other operations.
- * Currently we don't need to care about whether it succeed or not.
- *
- * @param selfLoad
- * Hub server load data.
- */
- public void uploadSelfLoadData(HubLoad selfLoad);
-
- /**
- * Check whether a hub server is alive as the id
- *
- * @param hub
- * Hub id to identify a lifecycle of a hub server
- * @param callback
- * Callback of check result. If the hub server is still
- * alive as the provided id <code>hub</code>, return true.
- * Otherwise return false.
- * @param ctx
- * Callback context
- */
- public void isHubAlive(HubInfo hub, Callback<Boolean> callback, Object ctx);
-
- /**
- * Choose a least loaded hub server from available hub servers.
- *
- * @param callback
- * Callback to return least loaded hub server.
- * @param ctx
- * Callback context.
- */
- public void chooseLeastLoadedHub(Callback<HubInfo> callback, Object ctx);
-
- /**
- * Try to rebalance the load within the cluster. This function will get
- * the {@link HubLoad} from all available hubs within the cluster, and then
- * shed additional load.
- *
- * @param tolerancePercentage
- * the percentage of load above average that is permissible.
- * @param maxLoadToShed
- * the maximum amount of load to shed per call.
- * @param callback
- * Callback indicating whether we reduced load or not.
- * @param ctx
- */
- public void rebalanceCluster(double tolerancePercentage, HubLoad maxLoadToShed,
- Callback<Boolean> callback, Object ctx);
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java
deleted file mode 100644
index 65cc9c4..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/MMTopicManager.java
+++ /dev/null
@@ -1,359 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.hedwig.server.topics;
-
-import java.net.UnknownHostException;
-import java.io.IOException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.meta.TopicOwnershipManager;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.Either;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.zookeeper.ZooKeeper;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-/**
- * TopicOwnershipManager based topic manager
- */
-public class MMTopicManager extends AbstractTopicManager implements TopicManager {
-
- private static final Logger logger = LoggerFactory.getLogger(MMTopicManager.class);
-
- // topic ownership manager
- private final TopicOwnershipManager mm;
- // hub server manager
- private final HubServerManager hubManager;
-
- private final HubInfo myHubInfo;
- private final HubLoad myHubLoad;
-
- // Boolean flag indicating if we should suspend activity. If this is true,
- // all of the Ops put into the queuer will fail automatically.
- protected volatile boolean isSuspended = false;
-
- public MMTopicManager(ServerConfiguration cfg, ZooKeeper zk,
- MetadataManagerFactory mmFactory,
- ScheduledExecutorService scheduler)
- throws UnknownHostException, PubSubException {
- super(cfg, scheduler);
- // initialize topic ownership manager
- this.mm = mmFactory.newTopicOwnershipManager();
- this.hubManager = new ZkHubServerManager(cfg, zk, addr, this);
-
- final SynchronousQueue<Either<HubInfo, PubSubException>> queue =
- new SynchronousQueue<Either<HubInfo, PubSubException>>();
-
- myHubLoad = new HubLoad(topics.size());
- this.hubManager.registerListener(new HubServerManager.ManagerListener() {
- @Override
- public void onSuspend() {
- isSuspended = true;
- }
- @Override
- public void onResume() {
- isSuspended = false;
- }
- @Override
- public void onShutdown() {
- // if hub server manager can't work, we had to quit
- Runtime.getRuntime().exit(1);
- }
- });
- this.hubManager.registerSelf(myHubLoad, new Callback<HubInfo>() {
- @Override
- public void operationFinished(final Object ctx, final HubInfo resultOfOperation) {
- logger.info("Successfully registered hub {} with zookeeper", resultOfOperation);
- ConcurrencyUtils.put(queue, Either.of(resultOfOperation, (PubSubException) null));
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- logger.error("Failed to register hub with zookeeper", exception);
- ConcurrencyUtils.put(queue, Either.of((HubInfo)null, exception));
- }
- }, null);
- Either<HubInfo, PubSubException> result = ConcurrencyUtils.take(queue);
- PubSubException pse = result.right();
- if (pse != null) {
- throw pse;
- }
- myHubInfo = result.left();
- logger.info("Start metadata manager based topic manager with hub id : " + myHubInfo);
- }
-
- @Override
- protected void realGetOwner(final ByteString topic, final boolean shouldClaim,
- final Callback<HedwigSocketAddress> cb, final Object ctx) {
- // If operations are suspended due to a ZK client disconnect, just error
- // out this call and return.
- if (isSuspended) {
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(
- "MMTopicManager service is temporarily suspended!"));
- return;
- }
-
- TopicStats stats = topics.getIfPresent(topic);
- if (null != stats) {
- cb.operationFinished(ctx, addr);
- return;
- }
-
- new MMGetOwnerOp(topic, cb, ctx).read();
- }
-
- /**
- * MetadataManager do topic ledger election using versioned writes.
- */
- class MMGetOwnerOp {
- ByteString topic;
- Callback<HedwigSocketAddress> cb;
- Object ctx;
-
- public MMGetOwnerOp(ByteString topic,
- Callback<HedwigSocketAddress> cb, Object ctx) {
- this.topic = topic;
- this.cb = cb;
- this.ctx = ctx;
- }
-
- protected void read() {
- mm.readOwnerInfo(topic, new Callback<Versioned<HubInfo>>() {
- @Override
- public void operationFinished(final Object ctx, final Versioned<HubInfo> owner) {
- if (null == owner) {
- logger.info("{} : No owner found for topic {}",
- new Object[] { addr, topic.toStringUtf8() });
- // no data found
- choose(Version.NEW);
- return;
- }
- final Version ownerVersion = owner.getVersion();
- if (null == owner.getValue()) {
- logger.info("{} : Invalid owner found for topic {}",
- new Object[] { addr, topic.toStringUtf8() });
- choose(ownerVersion);
- return;
- }
- final HubInfo hub = owner.getValue();
- logger.info("{} : Read owner of topic {} : {}",
- new Object[] { addr, topic.toStringUtf8(), hub });
-
- logger.info("{}, {}", new Object[] { hub, myHubInfo });
-
- if (hub.getAddress().equals(addr)) {
- if (myHubInfo.getZxid() == hub.getZxid()) {
- claimTopic(ctx);
- return;
- } else {
- choose(ownerVersion);
- return;
- }
- }
-
- logger.info("{} : Check whether owner {} for topic {} is still alive.",
- new Object[] { addr, hub, topic.toStringUtf8() });
- hubManager.isHubAlive(hub, new Callback<Boolean>() {
- @Override
- public void operationFinished(Object ctx, Boolean isAlive) {
- if (isAlive) {
- cb.operationFinished(ctx, hub.getAddress());
- } else {
- choose(ownerVersion);
- }
- }
- @Override
- public void operationFailed(Object ctx, PubSubException pse) {
- cb.operationFailed(ctx, pse);
- }
- }, ctx);
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(
- "Could not read ownership for topic " + topic.toStringUtf8() + " : "
- + exception.getMessage()));
- }
- }, ctx);
- }
-
- public void claim(final Version prevOwnerVersion) {
- logger.info("{} : claiming topic {} 's owner to be {}",
- new Object[] { addr, topic.toStringUtf8(), myHubInfo });
- mm.writeOwnerInfo(topic, myHubInfo, prevOwnerVersion, new Callback<Version>() {
- @Override
- public void operationFinished(Object ctx, Version newVersion) {
- claimTopic(ctx);
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- if (exception instanceof PubSubException.NoTopicOwnerInfoException ||
- exception instanceof PubSubException.BadVersionException) {
- // some one has updated the owner
- logger.info("{} : Some one has claimed topic {} 's owner. Try to read the owner again.",
- new Object[] { addr, topic.toStringUtf8() });
- read();
- return;
- }
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(
- "Exception when writing owner info to claim ownership of topic "
- + topic.toStringUtf8() + " : " + exception.getMessage()));
- }
- }, ctx);
- }
-
- protected void claimTopic(Object ctx) {
- logger.info("{} : claimed topic {} 's owner to be {}",
- new Object[] { addr, topic.toStringUtf8(), myHubInfo });
- notifyListenersAndAddToOwnedTopics(topic, cb, ctx);
- hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size()));
- }
-
- public void choose(final Version prevOwnerVersion) {
- hubManager.chooseLeastLoadedHub(new Callback<HubInfo>() {
- @Override
- public void operationFinished(Object ctx, HubInfo owner) {
- logger.info("{} : Least loaded owner {} is chosen for topic {}",
- new Object[] { addr, owner, topic.toStringUtf8() });
- if (owner.getAddress().equals(addr)) {
- claim(prevOwnerVersion);
- } else {
- setOwner(owner, prevOwnerVersion);
- }
- }
- @Override
- public void operationFailed(Object ctx, PubSubException pse) {
- logger.error("Failed to choose least loaded hub server for topic "
- + topic.toStringUtf8() + " : ", pse);
- cb.operationFailed(ctx, pse);
- }
- }, null);
- }
-
- public void setOwner(final HubInfo ownerHubInfo, final Version prevOwnerVersion) {
- logger.info("{} : setting topic {} 's owner to be {}",
- new Object[] { addr, topic.toStringUtf8(), ownerHubInfo });
- mm.writeOwnerInfo(topic, ownerHubInfo, prevOwnerVersion, new Callback<Version>() {
- @Override
- public void operationFinished(Object ctx, Version newVersion) {
- logger.info("{} : Set topic {} 's owner to be {}",
- new Object[] { addr, topic.toStringUtf8(), ownerHubInfo });
- cb.operationFinished(ctx, ownerHubInfo.getAddress());
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- if (exception instanceof PubSubException.NoTopicOwnerInfoException ||
- exception instanceof PubSubException.BadVersionException) {
- // some one has updated the owner
- logger.info("{} : Some one has set topic {} 's owner. Try to read the owner again.",
- new Object[] { addr, topic.toStringUtf8() });
- read();
- return;
- }
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(
- "Exception when writing owner info to claim ownership of topic "
- + topic.toStringUtf8() + " : " + exception.getMessage()));
- }
- }, ctx);
- }
- }
-
- @Override
- protected void postReleaseCleanup(final ByteString topic,
- final Callback<Void> cb, final Object ctx) {
-
- // Reduce load. We've removed the topic from our topic set, so do this as well.
- // When we reclaim the topic, we will increment the load again.
- hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size()));
-
- mm.readOwnerInfo(topic, new Callback<Versioned<HubInfo>>() {
- @Override
- public void operationFinished(Object ctx, Versioned<HubInfo> owner) {
- if (null == owner) {
- // Node has somehow disappeared from under us, live with it
- logger.warn("No owner info found when cleaning up topic " + topic.toStringUtf8());
- cb.operationFinished(ctx, null);
- return;
- }
- // no valid hub info found, just return
- if (null == owner.getValue()) {
- logger.warn("No valid owner info found when cleaning up topic " + topic.toStringUtf8());
- cb.operationFinished(ctx, null);
- return;
- }
- HedwigSocketAddress ownerAddr = owner.getValue().getAddress();
- if (!ownerAddr.equals(addr)) {
- logger.warn("Wanted to clean up self owner info for topic " + topic.toStringUtf8()
- + " but owner " + owner + " found, leaving untouched");
- // Not our node, someone else's, leave it alone
- cb.operationFinished(ctx, null);
- return;
- }
-
- mm.deleteOwnerInfo(topic, owner.getVersion(), new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx, Void result) {
- cb.operationFinished(ctx, null);
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- if (exception instanceof PubSubException.NoTopicOwnerInfoException) {
- logger.warn("Wanted to clean up self owner info for topic " + topic.toStringUtf8()
- + " but it has been removed.");
- cb.operationFinished(ctx, null);
- return;
- }
- logger.error("Exception when deleting self-ownership metadata for topic "
- + topic.toStringUtf8() + " : ", exception);
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(exception));
- }
- }, ctx);
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- logger.error("Exception when cleaning up owner info of topic " + topic.toStringUtf8() + " : ", exception);
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(exception));
- }
- }, ctx);
- }
-
- @Override
- public void stop() {
- // we just unregister it with zookeeper to make it unavailable from hub servers list
- try {
- hubManager.unregisterSelf();
- } catch (IOException e) {
- logger.error("Error unregistering hub server " + myHubInfo + " : ", e);
- }
- super.stop();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java
deleted file mode 100644
index 2a0dcc0..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicBasedLoadShedder.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.util.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-
-/**
- * Shed load by releasing topics.
- */
-public class TopicBasedLoadShedder {
- private static final Logger logger = LoggerFactory.getLogger(TopicBasedLoadShedder.class);
- private final double tolerancePercentage;
- private final long maxLoadToShed;
- private final TopicManager tm;
- private final List<ByteString> topicList;
-
- /**
- * @param tm The topic manager used to handle load shedding
- * @param tolerancePercentage The tolerance percentage for shedding load
- * @param maxLoadToShed The maximum amoung of load to shed in one call.
- */
- public TopicBasedLoadShedder(TopicManager tm, double tolerancePercentage,
- HubLoad maxLoadToShed) {
- // Make sure that all functions in this class have a consistent view
- // of the load. So, we use the same topic list throughout.
- this(tm, tm.getTopicList(), tolerancePercentage, maxLoadToShed);
- }
-
- /**
- * This is public because it makes testing easier.
- * @param tm The topic manager used to handle load shedding
- * @param topicList The topic list representing topics owned by this hub.
- * @param tolerancePercentage The tolerance percentage for shedding load
- * @param maxLoadToShed The maximum amoung of load to shed in one call.
- */
- TopicBasedLoadShedder(TopicManager tm, List<ByteString> topicList,
- double tolerancePercentage,
- HubLoad maxLoadToShed) {
- this.tolerancePercentage = tolerancePercentage;
- this.maxLoadToShed = maxLoadToShed.getNumTopics();
- this.tm = tm;
- this.topicList = topicList;
- }
-
- /**
- * Reduce the load on the current hub so that it reaches the target load.
- * We reduce load by releasing topics using the {@link TopicManager} passed
- * to the constructor. We use {@link TopicManager#releaseTopics(int, org.apache.hedwig.util.Callback, Object)}
- * to actually release topics.
- *
- * @param targetLoad
- * @param callback
- * a Callback<Long> that indicates how many topics we tried to release.
- * @param ctx
- */
- public void reduceLoadTo(HubLoad targetLoad, final Callback<Long> callback, final Object ctx) {
- int targetTopics = (int)targetLoad.toHubLoadData().getNumTopics();
- int numTopicsToRelease = topicList.size() - targetTopics;
-
- // The number of topics we own is less than the target topic size. We don't release
- // any topics in this case.
- if (numTopicsToRelease <= 0) {
- callback.operationFinished(ctx, 0L);
- return;
- }
- // Call releaseTopics() on the topic manager to do this. We let the manager handle the release
- // policy.
- tm.releaseTopics(numTopicsToRelease, callback, ctx);
- }
-
- /**
- * Calculate the average number of topics on the currently active hubs and release topics
- * if required.
- * We shed topics if we currently hold topics greater than average + average * tolerancePercentage/100.0
- * We shed a maximum of maxLoadToShed topics
- * We also hold on to at least one topic.
- * @param loadMap
- * @param callback
- * A return value of true means we tried to rebalance. False means that there was
- * no need to rebalance.
- * @param ctx
- */
- public void shedLoad(final Map<HubInfo, HubLoad> loadMap, final Callback<Boolean> callback,
- final Object ctx) {
-
- long totalTopics = 0L;
- long myTopics = topicList.size();
- for (Map.Entry<HubInfo, HubLoad> entry : loadMap.entrySet()) {
- if (null == entry.getKey() || null == entry.getValue()) {
- continue;
- }
- totalTopics += entry.getValue().toHubLoadData().getNumTopics();
- }
-
- double averageTopics = (double)totalTopics/loadMap.size();
- logger.info("Total topics in the cluster : {}. Average : {}.", totalTopics, averageTopics);
-
- // Handle the case when averageTopics == 0. We hold on to at least 1 topic.
- long permissibleTopics =
- Math.max(1L, (long) Math.ceil(averageTopics + averageTopics * tolerancePercentage / 100.0));
- logger.info("Permissible topics : {}. Number of topics this hub holds : {}.", permissibleTopics, myTopics);
- if (myTopics <= permissibleTopics) {
- // My owned topics are less than those permitted by the current tolerance level. No need to release
- // any topics.
- callback.operationFinished(ctx, false);
- return;
- }
-
- // The number of topics I own is more than what I should be holding. We shall now attempt to shed some load.
- // We shed at most maxLoadToShed number of topics. We also hold on to at least 1 topic.
- long targetNumTopics = Math.max(1L, Math.max((long)Math.ceil(averageTopics), myTopics - maxLoadToShed));
-
- // Reduce the load on the current hub to the target load we calculated above.
- logger.info("Reducing load on this hub to {} topics.", targetNumTopics);
- reduceLoadTo(new HubLoad(targetNumTopics), new Callback<Long>() {
- @Override
- public void operationFinished(Object ctx, Long numReleased) {
- logger.info("Released {} topics to shed load.", numReleased);
- callback.operationFinished(ctx, true);
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException e) {
- callback.operationFailed(ctx, e);
- }
- }, ctx);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
deleted file mode 100644
index 4ed2e59..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicManager.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-import java.util.List;
-
-/**
- * An implementor of this interface is basically responsible for ensuring that
- * there is at most a single host responsible for a given topic at a given time.
- * Also, it is desirable that on a host failure, some other hosts in the cluster
- * claim responsibilities for the topics that were at the failed host. On
- * claiming responsibility for a topic, a host should call its
- * {@link TopicOwnershipChangeListener}.
- *
- */
-
-public interface TopicManager {
- /**
- * Get the name of the host responsible for the given topic.
- *
- * @param topic
- * The topic whose owner to get.
- * @param cb
- * Callback.
- * @return The name of host responsible for the given topic
- * @throws ServiceDownException
- * If there is an error looking up the information
- */
- public void getOwner(ByteString topic, boolean shouldClaim,
- Callback<HedwigSocketAddress> cb, Object ctx);
-
- /**
- * Increment the number of access times for a given <code>topic</code>.
- *
- * @param topic
- * Topic Name.
- */
- public void incrementTopicAccessTimes(ByteString topic);
-
- /**
- * Whenever the topic manager finds out that the set of topics owned by this
- * node has changed, it can notify a set of
- * {@link TopicOwnershipChangeListener} objects. Any component of the system
- * (e.g., the {@link PersistenceManager}) can listen for such changes by
- * implementing the {@link TopicOwnershipChangeListener} interface and
- * registering themselves with the {@link TopicManager} using this method.
- * It is important that the {@link TopicOwnershipChangeListener} reacts
- * immediately to such notifications, and with no blocking (because multiple
- * listeners might need to be informed and they are all informed by the same
- * thread).
- *
- * @param listener
- */
- public void addTopicOwnershipChangeListener(TopicOwnershipChangeListener listener);
-
- /**
- * Give up ownership of a topic. If I don't own it, do nothing.
- *
- * @throws ServiceDownException
- * If there is an error in claiming responsibility for the topic
- */
- public void releaseTopic(ByteString topic, Callback<Void> cb, Object ctx);
-
- /**
- * Release numTopics topics. If you hold fewer, release all.
- * @param numTopics
- * Number of topics to release.
- * @param callback
- * The callback should be invoked with the number of topics the hub
- * released successfully.
- * @param ctx
- */
- public void releaseTopics(int numTopics, Callback<Long> callback, Object ctx);
-
- /**
- * Get the list of topics this hub believes it is responsible for.
- * @return
- */
- public List<ByteString> getTopicList();
-
- /**
- * Stop topic manager
- */
- public void stop();
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java
deleted file mode 100644
index b0fe2c9..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TopicOwnershipChangeListener.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.util.Callback;
-
-public interface TopicOwnershipChangeListener {
-
- public void acquiredTopic(ByteString topic, Callback<Void> callback, Object ctx);
-
- public void lostTopic(ByteString topic);
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java
deleted file mode 100644
index 6b3a417..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/TrivialOwnAllTopicManager.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.net.UnknownHostException;
-import java.util.concurrent.ScheduledExecutorService;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-public class TrivialOwnAllTopicManager extends AbstractTopicManager {
-
- public TrivialOwnAllTopicManager(ServerConfiguration cfg, ScheduledExecutorService scheduler)
- throws UnknownHostException {
- super(cfg, scheduler);
- }
-
- @Override
- protected void realGetOwner(ByteString topic, boolean shouldClaim,
- Callback<HedwigSocketAddress> cb, Object ctx) {
-
- TopicStats stats = topics.getIfPresent(topic);
- if (null != stats) {
- cb.operationFinished(ctx, addr);
- return;
- }
-
- notifyListenersAndAddToOwnedTopics(topic, cb, ctx);
- }
-
- @Override
- protected void postReleaseCleanup(ByteString topic, Callback<Void> cb, Object ctx) {
- // No cleanup to do
- cb.operationFinished(ctx, null);
- }
-
- @Override
- public void stop() {
- // do nothing
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java
deleted file mode 100644
index 9651058..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkHubServerManager.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
-import org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StatCallback;
-import org.apache.hedwig.zookeeper.ZkUtils;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * ZooKeeper based hub server manager.
- */
-class ZkHubServerManager implements HubServerManager {
-
- static Logger logger = LoggerFactory.getLogger(ZkHubServerManager.class);
-
- final Random rand = new Random();
-
- private final ServerConfiguration conf;
- private final ZooKeeper zk;
- private final HedwigSocketAddress addr;
- private final TopicManager tm;
- private final String ephemeralNodePath;
- private final String hubNodesPath;
-
- // hub info structure represent itself
- protected HubInfo myHubInfo;
- protected volatile boolean isSuspended = false;
- protected ManagerListener listener = null;
- protected final ScheduledExecutorService executor;
-
- // upload hub server load to zookeeper
- StatCallback loadReportingStatCallback = new StatCallback() {
- @Override
- public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
- if (rc != KeeperException.Code.OK.intValue()) {
- logger.warn("Failed to update load information of hub {} in zk", myHubInfo);
- }
- }
- };
-
- /**
- * Watcher to monitor available hub server list.
- */
- class ZkHubsWatcher implements Watcher {
- @Override
- public void process(WatchedEvent event) {
- if (event.getType().equals(Watcher.Event.EventType.None)) {
- if (event.getState().equals(
- Watcher.Event.KeeperState.Disconnected)) {
- logger.warn("ZK client has been disconnected to the ZK server!");
- isSuspended = true;
- if (null != listener) {
- listener.onSuspend();
- }
- } else if (event.getState().equals(
- Watcher.Event.KeeperState.SyncConnected)) {
- if (isSuspended) {
- logger.info("ZK client has been reconnected to the ZK server!");
- }
- isSuspended = false;
- if (null != listener) {
- listener.onResume();
- }
- }
- }
- if (event.getState().equals(Watcher.Event.KeeperState.Expired)) {
- logger.error("ZK client connection to the ZK server has expired.!");
- if (null != listener) {
- // Shutdown our executor NOW!
- executor.shutdownNow();
- listener.onShutdown();
- }
- }
- }
- }
-
- class RebalanceRunnable implements Runnable {
- private final double tolerancePercentage;
- private final HubLoad maxLoadToShed;
- private final long delaySeconds;
-
- public RebalanceRunnable(double tolerancePercentage,
- HubLoad maxLoadToShed,
- long delaySeconds) {
- this.tolerancePercentage = tolerancePercentage;
- this.maxLoadToShed = maxLoadToShed;
- this.delaySeconds = delaySeconds;
- }
-
- @Override
- public void run() {
- // If we are in suspended state, don't attempt a rebalance.
- if (isSuspended) {
- executor.schedule(this, delaySeconds, TimeUnit.SECONDS);
- return;
- }
- // We should attempt a rebalance. We reschedule the job at the tail so that
- // two rebalances don't happen simultaneously.
- rebalanceCluster(tolerancePercentage, maxLoadToShed, new Callback<Boolean>() {
- private void reschedule(Runnable task) {
- executor.schedule(task, delaySeconds, TimeUnit.SECONDS);
- }
-
- @Override
- public void operationFinished(Object ctx, Boolean didRebalance) {
- if (didRebalance == true) {
- logger.info("The attempt to rebalance the cluster was successful");
- } else {
- logger.info("There was no need to rebalance.");
- }
- // Our original runnable was passed as the context.
- reschedule((Runnable)ctx);
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException e) {
- logger.error("The attempt to rebalance the cluster did not succeed.", e);
- // Reschedule the job
- reschedule((Runnable)ctx);
- }
- }, this);
- }
-
- public void start() {
- // Initiate only if delaySeconds > 0
- if (delaySeconds > 0) {
- logger.info("Starting the rebalancer thread with tolerance={}, maxLoadToShed={} and delay={}",
- new Object[] { tolerancePercentage, maxLoadToShed.getNumTopics(), delaySeconds });
- executor.schedule(this, delaySeconds, TimeUnit.SECONDS);
- }
- }
- }
-
- public ZkHubServerManager(ServerConfiguration conf,
- ZooKeeper zk,
- HedwigSocketAddress addr,
- TopicManager tm) {
- this.conf = conf;
- this.zk = zk;
- this.addr = addr;
- this.tm = tm;
- // znode path to store all available hub servers
- this.hubNodesPath = this.conf.getZkHostsPrefix(new StringBuilder()).toString();
- // the node's ephemeral node path
- this.ephemeralNodePath = getHubZkNodePath(addr);
- this.executor = Executors.newSingleThreadScheduledExecutor();
- // register available hub servers list watcher
- zk.register(new ZkHubsWatcher());
-
- // Start the rebalancer here.
- new RebalanceRunnable(conf.getRebalanceTolerance(), conf.getRebalanceMaxShed(),
- conf.getRebalanceInterval()).start();
- }
-
- @Override
- public void registerListener(ManagerListener listener) {
- this.listener = listener;
- }
-
- /**
- * Get the znode path identifying a hub server.
- *
- * @param node
- * Hub Server Address
- * @return znode path identifying the hub server.
- */
- private String getHubZkNodePath(HedwigSocketAddress node) {
- String nodePath = this.conf.getZkHostsPrefix(new StringBuilder())
- .append("/").append(node).toString();
- return nodePath;
- }
-
- @Override
- public void registerSelf(final HubLoad selfData, final Callback<HubInfo> callback, Object ctx) {
- byte[] loadDataBytes = selfData.toString().getBytes(UTF_8);
- ZkUtils.createFullPathOptimistic(zk, ephemeralNodePath, loadDataBytes, Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
- @Override
- public void safeProcessResult(int rc, String path, Object ctx, String name) {
- if (rc == Code.OK.intValue()) {
- // now we are here
- zk.exists(ephemeralNodePath, false, new SafeAsyncZKCallback.StatCallback() {
- @Override
- public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
- if (rc == Code.OK.intValue()) {
- myHubInfo = new HubInfo(addr, stat.getCzxid());
- callback.operationFinished(ctx, myHubInfo);
- return;
- } else {
- callback.operationFailed(ctx,
- new PubSubException.ServiceDownException(
- "I can't state my hub node after I created it : "
- + ephemeralNodePath));
- return;
- }
- }
- }, ctx);
- return;
- }
- if (rc != Code.NODEEXISTS.intValue()) {
- KeeperException ke = ZkUtils.logErrorAndCreateZKException(
- "Could not create ephemeral node to register hub", ephemeralNodePath, rc);
- callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
- return;
- }
-
- logger.info("Found stale ephemeral node while registering hub with ZK, deleting it");
-
- // Node exists, lets try to delete it and retry
- zk.delete(ephemeralNodePath, -1, new SafeAsyncZKCallback.VoidCallback() {
- @Override
- public void safeProcessResult(int rc, String path, Object ctx) {
- if (rc == Code.OK.intValue() || rc == Code.NONODE.intValue()) {
- registerSelf(selfData, callback, ctx);
- return;
- }
- KeeperException ke = ZkUtils.logErrorAndCreateZKException(
- "Could not delete stale ephemeral node to register hub", ephemeralNodePath, rc);
- callback.operationFailed(ctx, new PubSubException.ServiceDownException(ke));
- return;
- }
- }, ctx);
- }
- }, ctx);
- }
-
- @Override
- public void unregisterSelf() throws IOException {
- try {
- zk.delete(ephemeralNodePath, -1);
- } catch (InterruptedException e) {
- throw new IOException(e);
- } catch (KeeperException e) {
- throw new IOException(e);
- }
- }
-
-
- @Override
- public void uploadSelfLoadData(HubLoad selfLoad) {
- logger.debug("Reporting hub load of {} : {}", myHubInfo, selfLoad);
- byte[] loadDataBytes = selfLoad.toString().getBytes(UTF_8);
- zk.setData(ephemeralNodePath, loadDataBytes, -1,
- loadReportingStatCallback, null);
- }
-
- @Override
- public void isHubAlive(final HubInfo hub, final Callback<Boolean> callback, Object ctx) {
- zk.exists(getHubZkNodePath(hub.getAddress()), false, new SafeAsyncZKCallback.StatCallback() {
- @Override
- public void safeProcessResult(int rc, String path, Object ctx, Stat stat) {
- if (rc == Code.NONODE.intValue()) {
- callback.operationFinished(ctx, false);
- } else if (rc == Code.OK.intValue()) {
- if (hub.getZxid() == stat.getCzxid()) {
- callback.operationFinished(ctx, true);
- } else {
- callback.operationFinished(ctx, false);
- }
- } else {
- callback.operationFailed(ctx, new PubSubException.ServiceDownException(
- "Failed to check whether hub server " + hub + " is alive!"));
- }
- }
- }, ctx);
- }
-
- @Override
- public void chooseLeastLoadedHub(final Callback<HubInfo> callback, Object ctx) {
- // Get the list of existing hosts
- zk.getChildren(hubNodesPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
- @Override
- public void safeProcessResult(int rc, String path, Object ctx,
- List<String> children) {
- if (rc != Code.OK.intValue()) {
- KeeperException e = ZkUtils.logErrorAndCreateZKException(
- "Could not get list of available hubs", path, rc);
- callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
- return;
- }
- chooseLeastLoadedNode(children, callback, ctx);
- }
- }, ctx);
- }
-
- private void chooseLeastLoadedNode(final List<String> children,
- final Callback<HubInfo> callback, Object ctx) {
- SafeAsyncZKCallback.DataCallback dataCallback = new SafeAsyncZKCallback.DataCallback() {
- int numResponses = 0;
- HubLoad minLoad = HubLoad.MAX_LOAD;
- String leastLoaded = null;
- long leastLoadedCzxid = 0;
-
- @Override
- public void safeProcessResult(int rc, String path, Object ctx,
- byte[] data, Stat stat) {
- synchronized (this) {
- if (rc == KeeperException.Code.OK.intValue()) {
- try {
- HubLoad load = HubLoad.parse(new String(data, UTF_8));
- logger.debug("Found server {} with load: {}", ctx, load);
- int compareRes = load.compareTo(minLoad);
- if (compareRes < 0 || (compareRes == 0 && rand.nextBoolean())) {
- minLoad = load;
- leastLoaded = (String) ctx;
- leastLoadedCzxid = stat.getCzxid();
- }
- } catch (HubLoad.InvalidHubLoadException e) {
- logger.warn("Corrupted load information from hub : " + ctx);
- // some corrupted data, we'll just ignore this hub
- }
- }
- numResponses++;
-
- if (numResponses == children.size()) {
- if (leastLoaded == null) {
- callback.operationFailed(ctx,
- new PubSubException.ServiceDownException("No hub available"));
- return;
- }
- try {
- HedwigSocketAddress owner = new HedwigSocketAddress(leastLoaded);
- callback.operationFinished(ctx, new HubInfo(owner, leastLoadedCzxid));
- } catch (Throwable t) {
- callback.operationFailed(ctx,
- new PubSubException.ServiceDownException("Least loaded hub server "
- + leastLoaded + " is invalid."));
- }
- }
- }
- }
- };
-
- for (String child : children) {
- zk.getData(conf.getZkHostsPrefix(new StringBuilder()).append("/").append(child).toString(), false,
- dataCallback, child);
- }
- }
-
- /**
- * Get a map of all currently active hubs with their advertised load.
- * @param callback
- * @param originalCtx
- */
- private void getActiveHubsInfoWithLoad(final Callback<Map<HubInfo, HubLoad>> callback,
- final Object originalCtx) {
- // Get the list of children and then for each child, get the data. All asynchronously.
- zk.getChildren(hubNodesPath, false, new SafeAsyncZKCallback.ChildrenCallback() {
- @Override
- public void safeProcessResult(int rc, String path, Object ctx, final List<String> children) {
- if (rc != Code.OK.intValue()) {
- KeeperException e = ZkUtils.logErrorAndCreateZKException(
- "Could not get children for given path", path, rc);
- callback.operationFailed(ctx, new PubSubException.ServiceDownException(e));
- return;
- }
-
- // The data callback for every child node
- SafeAsyncZKCallback.DataCallback dataCallback = new SafeAsyncZKCallback.DataCallback() {
- Map<HubInfo, HubLoad> loadMap = new HashMap<HubInfo, HubLoad>();
- int numResponse = 0;
- @Override
- public void safeProcessResult(int rc, String path, Object dataCtx,
- byte[] data, Stat stat) {
- synchronized (this) {
- if (rc == Code.OK.intValue()) {
- // Put this load in the map. dataCtx is actually the child string which is the
- // IP:PORT:SSL representation of the hub.
- try {
- HubInfo hubInfo =
- new HubInfo(new HedwigSocketAddress((String)dataCtx), stat.getCzxid());
- HubLoad hubLoad = HubLoad.parse(new String(data, UTF_8));
- this.loadMap.put(hubInfo, hubLoad);
- } catch (HubLoad.InvalidHubLoadException e) {
- logger.warn("Corrupt data found for a hub. Ignoring.");
- }
- }
- numResponse++;
- if (numResponse == children.size()) {
- // We got less number of valid responses than the hubs we saw previously.
- // Signal an error.
- if (loadMap.size() != numResponse) {
- callback.operationFailed(originalCtx,
- new PubSubException.UnexpectedConditionException(
- "Fewer OK responses than the number of active hubs seen previously."));
- return;
- }
- // We've seen all responses. All OK.
- callback.operationFinished(originalCtx, loadMap);
- }
- }
- }
- };
-
- for (String child : children) {
- String znode = conf.getZkHostsPrefix(new StringBuilder()).append("/").append(child).toString();
- zk.getData(znode, false, dataCallback, child);
- }
- }
- }, originalCtx);
- }
-
- @Override
- public void rebalanceCluster(final double tolerancePercentage, final HubLoad maxLoadToShed,
- final Callback<Boolean> callback, final Object ctx) {
- // Get the load on all active hubs and then shed load if required.
- getActiveHubsInfoWithLoad(new Callback<Map<HubInfo, HubLoad>>() {
- @Override
- public void operationFinished(Object ctx, Map<HubInfo, HubLoad> loadMap) {
- if (null == tm) {
- // No topic manager, so no load to shed.
- callback.operationFinished(ctx, false);
- return;
- }
- TopicBasedLoadShedder tbls = new TopicBasedLoadShedder(tm,
- tolerancePercentage, maxLoadToShed);
- tbls.shedLoad(loadMap, callback, ctx);
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException e) {
- // Rebalance failed. Log this and signal failure on the callback.
- logger.error("Failed to get active hubs. Cannot attempt a rebalance.");
- callback.operationFailed(ctx, e);
- }
- }, ctx);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
deleted file mode 100644
index 2424d27..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/ZkTopicManager.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.net.UnknownHostException;
-import java.io.IOException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.SynchronousQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.data.Stat;
-
-import static com.google.common.base.Charsets.UTF_8;
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.ConcurrencyUtils;
-import org.apache.hedwig.util.Either;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
-import org.apache.hedwig.zookeeper.ZkUtils;
-
-/**
- * Topics are operated on in parallel as they are independent.
- *
- */
-public class ZkTopicManager extends AbstractTopicManager implements TopicManager {
-
- private static final Logger logger = LoggerFactory.getLogger(ZkTopicManager.class);
-
- /**
- * Persistent storage for topic metadata.
- */
- private ZooKeeper zk;
-
- // hub server manager
- private final HubServerManager hubManager;
-
- private final HubInfo myHubInfo;
- private final HubLoad myHubLoad;
-
- // Boolean flag indicating if we should suspend activity. If this is true,
- // all of the Ops put into the queuer will fail automatically.
- protected volatile boolean isSuspended = false;
-
- /**
- * Create a new topic manager. Pass in an active ZooKeeper client object.
- *
- * @param zk
- */
- public ZkTopicManager(final ZooKeeper zk, final ServerConfiguration cfg, ScheduledExecutorService scheduler)
- throws UnknownHostException, PubSubException {
-
- super(cfg, scheduler);
- this.zk = zk;
- this.hubManager = new ZkHubServerManager(cfg, zk, addr, this);
-
- myHubLoad = new HubLoad(topics.size());
- this.hubManager.registerListener(new HubServerManager.ManagerListener() {
- @Override
- public void onSuspend() {
- isSuspended = true;
- }
- @Override
- public void onResume() {
- isSuspended = false;
- }
- @Override
- public void onShutdown() {
- // if hub server manager can't work, we had to quit
- Runtime.getRuntime().exit(1);
- }
- });
-
- final SynchronousQueue<Either<HubInfo, PubSubException>> queue =
- new SynchronousQueue<Either<HubInfo, PubSubException>>();
- this.hubManager.registerSelf(myHubLoad, new Callback<HubInfo>() {
- @Override
- public void operationFinished(final Object ctx, final HubInfo resultOfOperation) {
- logger.info("Successfully registered hub {} with zookeeper", resultOfOperation);
- ConcurrencyUtils.put(queue, Either.of(resultOfOperation, (PubSubException) null));
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- logger.error("Failed to register hub with zookeeper", exception);
- ConcurrencyUtils.put(queue, Either.of((HubInfo)null, exception));
- }
- }, null);
-
- Either<HubInfo, PubSubException> result = ConcurrencyUtils.take(queue);
- PubSubException pse = result.right();
- if (pse != null) {
- throw pse;
- }
- myHubInfo = result.left();
- }
-
- String hubPath(ByteString topic) {
- return cfg.getZkTopicPath(new StringBuilder(), topic).append("/hub").toString();
- }
-
- @Override
- protected void realGetOwner(final ByteString topic, final boolean shouldClaim,
- final Callback<HedwigSocketAddress> cb, final Object ctx) {
- // If operations are suspended due to a ZK client disconnect, just error
- // out this call and return.
- if (isSuspended) {
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(
- "ZKTopicManager service is temporarily suspended!"));
- return;
- }
-
- TopicStats stats = topics.getIfPresent(topic);
- if (null != stats) {
- cb.operationFinished(ctx, addr);
- return;
- }
-
- new ZkGetOwnerOp(topic, shouldClaim, cb, ctx).read();
- }
-
- // Recursively call each other.
- class ZkGetOwnerOp {
- ByteString topic;
- boolean shouldClaim;
- Callback<HedwigSocketAddress> cb;
- Object ctx;
- String hubPath;
-
- public ZkGetOwnerOp(ByteString topic, boolean shouldClaim, Callback<HedwigSocketAddress> cb, Object ctx) {
- this.topic = topic;
- this.shouldClaim = shouldClaim;
- this.cb = cb;
- this.ctx = ctx;
- hubPath = hubPath(topic);
-
- }
-
- public void choose() {
- hubManager.chooseLeastLoadedHub(new Callback<HubInfo>() {
- @Override
- public void operationFinished(Object ctx, HubInfo owner) {
- logger.info("{} : Least loaded owner {} is chosen for topic {}",
- new Object[] { addr, owner, topic.toStringUtf8() });
- if (owner.getAddress().equals(addr)) {
- claim();
- } else {
- cb.operationFinished(ZkGetOwnerOp.this.ctx, owner.getAddress());
- }
- }
- @Override
- public void operationFailed(Object ctx, PubSubException pse) {
- logger.error("Failed to choose least loaded hub server for topic "
- + topic.toStringUtf8() + " : ", pse);
- cb.operationFailed(ctx, pse);
- }
- }, null);
- }
-
- public void claimOrChoose() {
- if (shouldClaim)
- claim();
- else
- choose();
- }
-
- public void read() {
- zk.getData(hubPath, false, new SafeAsyncZKCallback.DataCallback() {
- @Override
- public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
-
- if (rc == Code.NONODE.intValue()) {
- claimOrChoose();
- return;
- }
-
- if (rc != Code.OK.intValue()) {
- KeeperException e = ZkUtils.logErrorAndCreateZKException("Could not read ownership for topic: "
- + topic.toStringUtf8(), path, rc);
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
- return;
- }
-
- // successfully did a read
- try {
- HubInfo ownerHubInfo = HubInfo.parse(new String(data, UTF_8));
- HedwigSocketAddress owner = ownerHubInfo.getAddress();
- if (!owner.equals(addr)) {
- if (logger.isDebugEnabled()) {
- logger.debug("topic: " + topic.toStringUtf8() + " belongs to someone else: " + owner);
- }
- cb.operationFinished(ctx, owner);
- return;
- }
- logger.info("Discovered stale self-node for topic: " + topic.toStringUtf8() + ", will delete it");
- } catch (HubInfo.InvalidHubInfoException ihie) {
- logger.info("Discovered invalid hub info for topic: " + topic.toStringUtf8() + ", will delete it : ", ihie);
- }
-
- // we must have previously failed and left a
- // residual ephemeral node here, so we must
- // delete it (clean it up) and then
- // re-create/re-acquire the topic.
- zk.delete(hubPath, stat.getVersion(), new VoidCallback() {
- @Override
- public void processResult(int rc, String path, Object ctx) {
- if (Code.OK.intValue() == rc || Code.NONODE.intValue() == rc) {
- claimOrChoose();
- } else {
- KeeperException e = ZkUtils.logErrorAndCreateZKException(
- "Could not delete self node for topic: " + topic.toStringUtf8(), path, rc);
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
- }
- }
- }, ctx);
- }
- }, ctx);
- }
-
- public void claim() {
- if (logger.isDebugEnabled()) {
- logger.debug("claiming topic: " + topic.toStringUtf8());
- }
-
- ZkUtils.createFullPathOptimistic(zk, hubPath,
- myHubInfo.toString().getBytes(UTF_8), Ids.OPEN_ACL_UNSAFE,
- CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() {
-
- @Override
- public void safeProcessResult(int rc, String path, Object ctx, String name) {
- if (rc == Code.OK.intValue()) {
- if (logger.isDebugEnabled()) {
- logger.debug("claimed topic: " + topic.toStringUtf8());
- }
- notifyListenersAndAddToOwnedTopics(topic, cb, ctx);
- hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size()));
- } else if (rc == Code.NODEEXISTS.intValue()) {
- read();
- } else {
- KeeperException e = ZkUtils.logErrorAndCreateZKException(
- "Failed to create ephemeral node to claim ownership of topic: "
- + topic.toStringUtf8(), path, rc);
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
- }
- }
- }, ctx);
- }
-
- }
-
- @Override
- protected void postReleaseCleanup(final ByteString topic, final Callback<Void> cb, Object ctx) {
-
- // Reduce load. We've removed the topic from our topic set, so do this as well.
- // When we reclaim the topic, we will increment the load again.
- hubManager.uploadSelfLoadData(myHubLoad.setNumTopics(topics.size()));
-
- zk.getData(hubPath(topic), false, new SafeAsyncZKCallback.DataCallback() {
- @Override
- public void safeProcessResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
- if (rc == Code.NONODE.intValue()) {
- // Node has somehow disappeared from under us, live with it
- // since its a transient node
- logger.warn("While deleting self-node for topic: " + topic.toStringUtf8() + ", node not found");
- cb.operationFinished(ctx, null);
- return;
- }
-
- if (rc != Code.OK.intValue()) {
- KeeperException e = ZkUtils.logErrorAndCreateZKException(
- "Failed to delete self-ownership node for topic: " + topic.toStringUtf8(), path, rc);
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
- return;
- }
-
- String hubInfoStr = new String(data, UTF_8);
- try {
- HubInfo ownerHubInfo = HubInfo.parse(hubInfoStr);
- HedwigSocketAddress owner = ownerHubInfo.getAddress();
- if (!owner.equals(addr)) {
- logger.warn("Wanted to delete self-node for topic: " + topic.toStringUtf8() + " but node for "
- + owner + " found, leaving untouched");
- // Not our node, someone else's, leave it alone
- cb.operationFinished(ctx, null);
- return;
- }
- } catch (HubInfo.InvalidHubInfoException ihie) {
- logger.info("Invalid hub info " + hubInfoStr + " found when release topic "
- + topic.toStringUtf8() + ". Leaving untouched until next acquire action.");
- cb.operationFinished(ctx, null);
- return;
- }
-
- zk.delete(path, stat.getVersion(), new SafeAsyncZKCallback.VoidCallback() {
- @Override
- public void safeProcessResult(int rc, String path, Object ctx) {
- if (rc != Code.OK.intValue() && rc != Code.NONODE.intValue()) {
- KeeperException e = ZkUtils
- .logErrorAndCreateZKException("Failed to delete self-ownership node for topic: "
- + topic.toStringUtf8(), path, rc);
- cb.operationFailed(ctx, new PubSubException.ServiceDownException(e));
- return;
- }
-
- cb.operationFinished(ctx, null);
- }
- }, ctx);
- }
- }, ctx);
- }
-
- @Override
- public void stop() {
- // we just unregister it with zookeeper to make it unavailable from hub servers list
- try {
- hubManager.unregisterSelf();
- } catch (IOException e) {
- logger.error("Error unregistering hub server :", e);
- }
- super.stop();
- }
-
-}