You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:20 UTC
[15/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/Tool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/Tool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/Tool.java
deleted file mode 100644
index bb14066..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/Tool.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.tools;
-
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.cli.BasicParser;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * A Tool Framework
- */
-public abstract class Tool {
-
- /**
- * Interface of a command to run in a tool.
- */
- protected interface Command {
- String getName();
- String getDescription();
- int runCmd(String[] args) throws Exception;
- void printUsage();
- }
-
- /**
- * {@link org.apache.commons.cli.Options} based command.
- */
- protected abstract static class OptsCommand implements Command {
-
- /**
- * @return options used by this command.
- */
- protected abstract Options getOptions();
-
- /**
- * @return usage of this command.
- */
- protected String getUsage() {
- return cmdName + " [options]";
- }
-
- /**
- * Run given command line <i>commandLine</i>.
- *
- * @param commandLine
- * command line to run.
- * @return return code of this command.
- * @throws Exception
- */
- protected abstract int runCmd(CommandLine commandLine) throws Exception;
-
- protected String cmdName;
- protected String description;
-
- protected OptsCommand(String name, String description) {
- this.cmdName = name;
- this.description = description;
- }
-
- @Override
- public String getName() {
- return cmdName;
- }
-
- @Override
- public String getDescription() {
- return description;
- }
-
- @Override
- public int runCmd(String[] args) throws Exception {
- try {
- BasicParser parser = new BasicParser();
- CommandLine cmdline = parser.parse(getOptions(), args);
- return runCmd(cmdline);
- } catch (ParseException e) {
- printUsage();
- return -1;
- }
- }
-
- @Override
- public void printUsage() {
- HelpFormatter helpFormatter = new HelpFormatter();
- println(cmdName + ": " + getDescription());
- helpFormatter.printHelp(getUsage(), getOptions());
- }
- }
-
- public class HelpCommand implements Command {
-
- @Override
- public String getName() {
- return "help";
- }
-
- @Override
- public String getDescription() {
- return "describe the usage of this tool or its sub-commands.";
- }
-
- @Override
- public int runCmd(String[] args) throws Exception {
- if (args.length == 0) {
- printToolUsage();
- return -1;
- }
- String cmdName = args[0];
- Command command = commands.get(cmdName);
- if (null == command) {
- System.err.println("Unknown command " + cmdName);
- printToolUsage();
- return -1;
- }
- command.printUsage();
- println("");
- return 0;
- }
-
- @Override
- public void printUsage() {
- println(getName() + ": " + getDescription());
- println("");
- println("usage: " + getName() + " <command>");
- }
- }
-
- // Commands managed by a tool
- protected final Map<String, Command> commands =
- new TreeMap<String, Command>();
-
- protected Tool() {
- addCommand(new HelpCommand());
- }
-
- /**
- * @return tool name.
- */
- protected abstract String getName();
-
- /**
- * Add a command in this tool.
- *
- * @param command
- * command to run in this tool.
- */
- protected void addCommand(Command command) {
- commands.put(command.getName(), command);
- }
-
- /**
- * Print a message in this tool.
- *
- * @param msg
- * message to print
- */
- protected static void println(String msg) {
- System.out.println(msg);
- }
-
- /**
- * print tool usage.
- */
- protected void printToolUsage() {
- println("Usage: " + getName() + " <command>");
- println("");
- int maxKeyLength = 0;
- for (String key : commands.keySet()) {
- if (key.length() > maxKeyLength) {
- maxKeyLength = key.length();
- }
- }
- maxKeyLength += 2;
- for (Map.Entry<String, Command> entry : commands.entrySet()) {
- StringBuilder spacesBuilder = new StringBuilder();
- int numSpaces = maxKeyLength - entry.getKey().length();
- for (int i = 0; i < numSpaces; i++) {
- spacesBuilder.append(" ");
- }
- println("\t" + entry.getKey() + spacesBuilder.toString() + ": " + entry.getValue().getDescription());
- }
- println("");
- }
-
- public int run(String[] args) throws Exception {
- if (args.length <= 0) {
- printToolUsage();
- return -1;
- }
- String cmdName = args[0];
- Command cmd = commands.get(cmdName);
- if (null == cmd) {
- System.err.println("ERROR: Unknown command " + cmdName);
- printToolUsage();
- return -1;
- }
- // prepare new args
- String[] newArgs = new String[args.length - 1];
- System.arraycopy(args, 1, newArgs, 0, newArgs.length);
- return cmd.runCmd(newArgs);
- }
-
- public static void main(String args[]) {
- int rc = -1;
- if (args.length <= 0) {
- System.err.println("No tool to run.");
- System.err.println("");
- System.err.println("Usage : Tool <tool_class_name> <options>");
- System.exit(-1);
- }
- String toolClass = args[0];
- try {
- Tool tool = ReflectionUtils.newInstance(toolClass, Tool.class);
- String[] newArgs = new String[args.length - 1];
- System.arraycopy(args, 1, newArgs, 0, newArgs.length);
- rc = tool.run(newArgs);
- } catch (Throwable t) {
- System.err.println("Fail to run tool " + toolClass + " : ");
- t.printStackTrace();
- }
- System.exit(rc);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/package-info.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/package-info.java
deleted file mode 100644
index e2125bc..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/tools/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Tools for distributedlog
- */
-package com.twitter.distributedlog.tools;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Allocator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Allocator.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Allocator.java
deleted file mode 100644
index dcc3f58..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/Allocator.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.twitter.distributedlog.io.AsyncCloseable;
-import com.twitter.distributedlog.io.AsyncDeleteable;
-import com.twitter.distributedlog.util.Transaction.OpListener;
-import com.twitter.util.Future;
-
-import java.io.IOException;
-
-/**
- * A common interface to allocate <i>I</i> under transaction <i>T</i>.
- *
- * <h3>Usage Example</h3>
- *
- * Here is an example on demonstrating how `Allocator` works.
- *
- * <pre> {@code
- * Allocator<I, T, R> allocator = ...;
- *
- * // issue an allocate request
- * try {
- * allocator.allocate();
- * } catch (IOException ioe) {
- * // handle the exception
- * ...
- * return;
- * }
- *
- * // Start a transaction
- * final Transaction<T> txn = ...;
- *
- * // Try obtain object I
- * Future<I> tryObtainFuture = allocator.tryObtain(txn, new OpListener<I>() {
- * public void onCommit(I resource) {
- * // the obtain succeed, process with the resource
- * }
- * public void onAbort() {
- * // the obtain failed.
- * }
- * }).addFutureEventListener(new FutureEventListener() {
- * public void onSuccess(I resource) {
- * // the try obtain succeed. but the obtain has not been confirmed or aborted.
- * // execute the transaction to confirm if it could complete obtain
- * txn.execute();
- * }
- * public void onFailure(Throwable t) {
- * // handle the failure of try obtain
- * }
- * });
- *
- * }</pre>
- */
-public interface Allocator<I, T> extends AsyncCloseable, AsyncDeleteable {
-
- /**
- * Issue allocation request to allocate <i>I</i>.
- * The implementation should be non-blocking call.
- *
- * @throws IOException
- * if fail to request allocating a <i>I</i>.
- */
- void allocate() throws IOException;
-
- /**
- * Try obtaining an <i>I</i> in a given transaction <i>T</i>. The object obtained is tentative.
- * Whether the object is obtained or aborted is determined by the result of the execution. You could
- * register a listener under this `tryObtain` operation to know whether the object is obtained or
- * aborted.
- *
- * <p>
- * It is a typical two-phases operation on obtaining a resource from allocator.
- * The future returned by this method acts as a `prepare` operation, the resource is tentative obtained
- * from the allocator. The execution of the txn acts as a `commit` operation, the resource is confirmed
- * to be obtained by this transaction. <code>listener</code> is for the whole completion of the obtain.
- * <p>
- * <code>listener</code> is only triggered after `prepare` succeed. if `prepare` failed, no actions will
- * happen to the listener.
- *
- * @param txn
- * transaction.
- * @return future result returning <i>I</i> that would be obtained under transaction <code>txn</code>.
- */
- Future<I> tryObtain(Transaction<T> txn, OpListener<I> listener);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/CommandLineUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/CommandLineUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/CommandLineUtils.java
deleted file mode 100644
index 95ef3e2..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/CommandLineUtils.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.google.common.base.Optional;
-import org.apache.commons.cli.CommandLine;
-
-/**
- * Utils to commandline
- */
-public class CommandLineUtils {
-
- public static Optional<String> getOptionalStringArg(CommandLine cmdline, String arg) {
- if (cmdline.hasOption(arg)) {
- return Optional.of(cmdline.getOptionValue(arg));
- } else {
- return Optional.absent();
- }
- }
-
- public static Optional<Boolean> getOptionalBooleanArg(CommandLine cmdline, String arg) {
- if (cmdline.hasOption(arg)) {
- return Optional.of(true);
- } else {
- return Optional.absent();
- }
- }
-
- public static Optional<Integer> getOptionalIntegerArg(CommandLine cmdline, String arg) throws IllegalArgumentException {
- try {
- if (cmdline.hasOption(arg)) {
- return Optional.of(Integer.parseInt(cmdline.getOptionValue(arg)));
- } else {
- return Optional.absent();
- }
- } catch (NumberFormatException ex) {
- throw new IllegalArgumentException(arg + " is not a number");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/ConfUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/ConfUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/ConfUtils.java
deleted file mode 100644
index 46dd3b6..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/ConfUtils.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.config.ConcurrentConstConfiguration;
-import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.commons.configuration.Configuration;
-
-import java.util.Iterator;
-
-public class ConfUtils {
-
- /**
- * Load configurations with prefixed <i>section</i> from source configuration <i>srcConf</i> into
- * target configuration <i>targetConf</i>.
- *
- * @param targetConf
- * Target Configuration
- * @param srcConf
- * Source Configuration
- * @param section
- * Section Key
- */
- public static void loadConfiguration(Configuration targetConf, Configuration srcConf, String section) {
- Iterator confKeys = srcConf.getKeys();
- while (confKeys.hasNext()) {
- Object keyObject = confKeys.next();
- if (!(keyObject instanceof String)) {
- continue;
- }
- String key = (String) keyObject;
- if (key.startsWith(section)) {
- targetConf.setProperty(key.substring(section.length()), srcConf.getProperty(key));
- }
- }
- }
-
- /**
- * Create const dynamic configuration based on distributedlog configuration.
- *
- * @param conf
- * static distributedlog configuration.
- * @return dynamic configuration
- */
- public static DynamicDistributedLogConfiguration getConstDynConf(DistributedLogConfiguration conf) {
- ConcurrentConstConfiguration constConf = new ConcurrentConstConfiguration(conf);
- return new DynamicDistributedLogConfiguration(constConf);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
deleted file mode 100644
index 2f9e091..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/DLUtils.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.google.common.base.Objects;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.LogSegmentMetadata;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import org.apache.commons.lang.StringUtils;
-
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.List;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Utilities about DL implementations like uri, log segments, metadata serialization and deserialization.
- */
-public class DLUtils {
-
- /**
- * Find the log segment whose transaction ids are not less than provided <code>transactionId</code>.
- *
- * @param segments
- * segments to search
- * @param transactionId
- * transaction id to find
- * @return the first log segment whose transaction ids are not less than <code>transactionId</code>.
- */
- public static int findLogSegmentNotLessThanTxnId(List<LogSegmentMetadata> segments,
- long transactionId) {
- int found = -1;
- for (int i = segments.size() - 1; i >= 0; i--) {
- LogSegmentMetadata segment = segments.get(i);
- if (segment.getFirstTxId() <= transactionId) {
- found = i;
- break;
- }
- }
- if (found <= -1) {
- return -1;
- }
- if (found == 0 && segments.get(0).getFirstTxId() == transactionId) {
- return 0;
- }
- LogSegmentMetadata foundSegment = segments.get(found);
- if (foundSegment.getFirstTxId() == transactionId) {
- for (int i = found - 1; i >= 0; i--) {
- LogSegmentMetadata segment = segments.get(i);
- if (segment.isInProgress()) {
- break;
- }
- if (segment.getLastTxId() < transactionId) {
- break;
- }
- found = i;
- }
- return found;
- } else {
- if (foundSegment.isInProgress()
- || found == segments.size() - 1) {
- return found;
- }
- if (foundSegment.getLastTxId() >= transactionId) {
- return found;
- }
- return found + 1;
- }
- }
-
- /**
- * Assign next log segment sequence number based on a decreasing list of log segments.
- *
- * @param segmentListDesc
- * a decreasing list of log segments
- * @return null if no log segments was assigned a sequence number in <code>segmentListDesc</code>.
- * otherwise, return next log segment sequence number
- */
- public static Long nextLogSegmentSequenceNumber(List<LogSegmentMetadata> segmentListDesc) {
- int lastAssignedLogSegmentIdx = -1;
- Long lastAssignedLogSegmentSeqNo = null;
- Long nextLogSegmentSeqNo = null;
-
- for (int i = 0; i < segmentListDesc.size(); i++) {
- LogSegmentMetadata metadata = segmentListDesc.get(i);
- if (LogSegmentMetadata.supportsLogSegmentSequenceNo(metadata.getVersion())) {
- lastAssignedLogSegmentSeqNo = metadata.getLogSegmentSequenceNumber();
- lastAssignedLogSegmentIdx = i;
- break;
- }
- }
-
- if (null != lastAssignedLogSegmentSeqNo) {
- // latest log segment is assigned with a sequence number, start with next sequence number
- nextLogSegmentSeqNo = lastAssignedLogSegmentSeqNo + lastAssignedLogSegmentIdx + 1;
- }
- return nextLogSegmentSeqNo;
- }
-
- /**
- * Compute the start sequence id for <code>segment</code>, based on previous segment list
- * <code>segmentListDesc</code>.
- *
- * @param logSegmentDescList
- * list of segments in descending order
- * @param segment
- * segment to compute start sequence id for
- * @return start sequence id
- */
- public static long computeStartSequenceId(List<LogSegmentMetadata> logSegmentDescList,
- LogSegmentMetadata segment)
- throws UnexpectedException {
- long startSequenceId = 0L;
- for (LogSegmentMetadata metadata : logSegmentDescList) {
- if (metadata.getLogSegmentSequenceNumber() >= segment.getLogSegmentSequenceNumber()) {
- continue;
- } else if (metadata.getLogSegmentSequenceNumber() < (segment.getLogSegmentSequenceNumber() - 1)) {
- break;
- }
- if (metadata.isInProgress()) {
- throw new UnexpectedException("Should not complete log segment " + segment.getLogSegmentSequenceNumber()
- + " since it's previous log segment is still inprogress : " + logSegmentDescList);
- }
- if (metadata.supportsSequenceId()) {
- startSequenceId = metadata.getStartSequenceId() + metadata.getRecordCount();
- }
- }
- return startSequenceId;
- }
-
- /**
- * Deserialize log segment sequence number for bytes <code>data</code>.
- *
- * @param data
- * byte representation of log segment sequence number
- * @return log segment sequence number
- * @throws NumberFormatException if the bytes aren't valid
- */
- public static long deserializeLogSegmentSequenceNumber(byte[] data) {
- String seqNoStr = new String(data, UTF_8);
- return Long.parseLong(seqNoStr);
- }
-
- /**
- * Serilize log segment sequence number <code>logSegmentSeqNo</code> into bytes.
- *
- * @param logSegmentSeqNo
- * log segment sequence number
- * @return byte representation of log segment sequence number
- */
- public static byte[] serializeLogSegmentSequenceNumber(long logSegmentSeqNo) {
- return Long.toString(logSegmentSeqNo).getBytes(UTF_8);
- }
-
- /**
- * Deserialize log record transaction id for bytes <code>data</code>.
- *
- * @param data
- * byte representation of log record transaction id
- * @return log record transaction id
- * @throws NumberFormatException if the bytes aren't valid
- */
- public static long deserializeTransactionId(byte[] data) {
- String seqNoStr = new String(data, UTF_8);
- return Long.parseLong(seqNoStr);
- }
-
- /**
- * Serilize log record transaction id <code>transactionId</code> into bytes.
- *
- * @param transactionId
- * log record transaction id
- * @return byte representation of log record transaction id.
- */
- public static byte[] serializeTransactionId(long transactionId) {
- return Long.toString(transactionId).getBytes(UTF_8);
- }
-
- /**
- * Serialize log segment id into bytes.
- *
- * @param logSegmentId
- * log segment id
- * @return bytes representation of log segment id
- */
- public static byte[] logSegmentId2Bytes(long logSegmentId) {
- return Long.toString(logSegmentId).getBytes(UTF_8);
- }
-
- /**
- * Deserialize bytes into log segment id.
- *
- * @param data
- * bytes representation of log segment id
- * @return log segment id
- */
- public static long bytes2LogSegmentId(byte[] data) {
- return Long.parseLong(new String(data, UTF_8));
- }
-
- /**
- * Normalize the uri.
- *
- * @param uri the distributedlog uri.
- * @return the normalized uri
- */
- public static URI normalizeURI(URI uri) {
- checkNotNull(uri, "DistributedLog uri is null");
- String scheme = uri.getScheme();
- checkNotNull(scheme, "Invalid distributedlog uri : " + uri);
- scheme = scheme.toLowerCase();
- String[] schemeParts = StringUtils.split(scheme, '-');
- checkArgument(Objects.equal(DistributedLogConstants.SCHEME_PREFIX, schemeParts[0].toLowerCase()),
- "Unknown distributedlog scheme found : " + uri);
- URI normalizedUri;
- try {
- normalizedUri = new URI(
- schemeParts[0], // remove backend info
- uri.getAuthority(),
- uri.getPath(),
- uri.getQuery(),
- uri.getFragment());
- } catch (URISyntaxException e) {
- throw new IllegalArgumentException("Invalid distributedlog uri found : " + uri, e);
- }
- return normalizedUri;
- }
-
- private static String getHostIpLockClientId() {
- try {
- return InetAddress.getLocalHost().toString();
- } catch(Exception ex) {
- return DistributedLogConstants.UNKNOWN_CLIENT_ID;
- }
- }
-
- /**
- * Normalize the client id.
- *
- * @return the normalized client id.
- */
- public static String normalizeClientId(String clientId) {
- String normalizedClientId;
- if (clientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) {
- normalizedClientId = getHostIpLockClientId();
- } else {
- normalizedClientId = clientId;
- }
- return normalizedClientId;
- }
-
- /**
- * Is it a reserved stream name in bkdl namespace?
- *
- * @param name
- * stream name
- * @return true if it is reserved name, otherwise false.
- */
- public static boolean isReservedStreamName(String name) {
- return name.startsWith(".");
- }
-
- /**
- * Validate the stream name.
- *
- * @param nameOfStream
- * name of stream
- * @throws InvalidStreamNameException
- */
- public static void validateName(String nameOfStream)
- throws InvalidStreamNameException {
- String reason = null;
- char chars[] = nameOfStream.toCharArray();
- char c;
- // validate the stream to see if meet zookeeper path's requirement
- for (int i = 0; i < chars.length; i++) {
- c = chars[i];
-
- if (c == 0) {
- reason = "null character not allowed @" + i;
- break;
- } else if (c == '/') {
- reason = "'/' not allowed @" + i;
- break;
- } else if (c > '\u0000' && c < '\u001f'
- || c > '\u007f' && c < '\u009F'
- || c > '\ud800' && c < '\uf8ff'
- || c > '\ufff0' && c < '\uffff') {
- reason = "invalid charater @" + i;
- break;
- }
- }
- if (null != reason) {
- throw new InvalidStreamNameException(nameOfStream, reason);
- }
- if (isReservedStreamName(nameOfStream)) {
- throw new InvalidStreamNameException(nameOfStream,
- "Stream Name is reserved");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FailpointUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FailpointUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FailpointUtils.java
deleted file mode 100644
index 64101b3..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FailpointUtils.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class FailpointUtils {
- static final Logger logger = LoggerFactory.getLogger(FailpointUtils.class);
-
- public enum FailPointName {
- FP_StartLogSegmentBeforeLedgerCreate,
- FP_StartLogSegmentAfterLedgerCreate,
- FP_StartLogSegmentAfterInProgressCreate,
- FP_StartLogSegmentOnAssignLogSegmentSequenceNumber,
- FP_FinalizeLedgerBeforeDelete,
- FP_TransmitBeforeAddEntry,
- FP_TransmitComplete,
- FP_WriteInternalLostLock,
- FP_TransmitFailGetBuffer,
- FP_LockUnlockCleanup,
- FP_LockTryCloseRaceCondition,
- FP_LockTryAcquire,
- FP_ZooKeeperConnectionLoss,
- FP_RecoverIncompleteLogSegments,
- FP_LogWriterIssuePending,
- }
-
- public static interface FailPointAction {
- boolean checkFailPoint() throws IOException;
- boolean checkFailPointNoThrow();
- }
-
- public static abstract class AbstractFailPointAction implements FailPointAction {
- @Override
- public boolean checkFailPointNoThrow() {
- try {
- return checkFailPoint();
- } catch (IOException ex) {
- logger.error("failpoint action raised unexpected exception");
- return true;
- }
- }
- }
-
- public static final FailPointAction DEFAULT_ACTION = new AbstractFailPointAction() {
- @Override
- public boolean checkFailPoint() throws IOException {
- return true;
- }
- };
-
- public static final FailPointAction THROW_ACTION = new AbstractFailPointAction() {
- @Override
- public boolean checkFailPoint() throws IOException {
- throw new IOException("Throw ioexception for failure point");
- }
- };
-
- public enum FailPointActions {
- FailPointAction_Default,
- FailPointAction_Throw
- }
-
- static ConcurrentHashMap<FailPointName, FailPointAction> failPointState =
- new ConcurrentHashMap<FailPointName, FailPointAction>();
-
- public static void setFailpoint(FailPointName failpoint, FailPointActions action) {
- FailPointAction fpAction = null;
- switch (action) {
- case FailPointAction_Default:
- fpAction = DEFAULT_ACTION;
- break;
- case FailPointAction_Throw:
- fpAction = THROW_ACTION;
- break;
- default:
- break;
- }
- setFailpoint(failpoint, fpAction);
- }
-
- public static void setFailpoint(FailPointName failpoint, FailPointAction action) {
- if (null != action) {
- failPointState.put(failpoint, action);
- }
- }
-
- public static void removeFailpoint(FailPointName failpoint) {
- failPointState.remove(failpoint);
- }
-
- public static boolean checkFailPoint(FailPointName failPoint) throws IOException {
- FailPointAction action = failPointState.get(failPoint);
-
- if (action == null) {
- return false;
- }
-
- try {
- return action.checkFailPoint();
- } catch (IOException ioe) {
- throw new IOException("Induced Exception at:" + failPoint, ioe);
- }
- }
-
- public static boolean checkFailPointNoThrow(FailPointName failPoint) {
- FailPointAction action = failPointState.get(failPoint);
-
- if (action == null) {
- return false;
- }
-
- return action.checkFailPointNoThrow();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
deleted file mode 100644
index f206a25..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
+++ /dev/null
@@ -1,534 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.exceptions.BKTransmitException;
-import com.twitter.distributedlog.exceptions.LockingException;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.exceptions.ZKException;
-import com.twitter.distributedlog.stats.OpStatsListener;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureCancelledException;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
-import com.twitter.util.Try;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.zookeeper.KeeperException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Utilities to process future
- */
-public class FutureUtils {
-
- private static final Logger logger = LoggerFactory.getLogger(FutureUtils.class);
-
- public static class OrderedFutureEventListener<R>
- implements FutureEventListener<R> {
-
- public static <R> OrderedFutureEventListener<R> of(
- FutureEventListener<R> listener,
- OrderedScheduler scheduler,
- Object key) {
- return new OrderedFutureEventListener<R>(scheduler, key, listener);
- }
-
- private final OrderedScheduler scheduler;
- private final Object key;
- private final FutureEventListener<R> listener;
-
- private OrderedFutureEventListener(OrderedScheduler scheduler,
- Object key,
- FutureEventListener<R> listener) {
- this.scheduler = scheduler;
- this.key = key;
- this.listener = listener;
- }
-
- @Override
- public void onSuccess(final R value) {
- scheduler.submit(key, new Runnable() {
- @Override
- public void run() {
- listener.onSuccess(value);
- }
- });
- }
-
- @Override
- public void onFailure(final Throwable cause) {
- scheduler.submit(key, new Runnable() {
- @Override
- public void run() {
- listener.onFailure(cause);
- }
- });
- }
- }
-
- public static class FutureEventListenerRunnable<R>
- implements FutureEventListener<R> {
-
- public static <R> FutureEventListenerRunnable<R> of(
- FutureEventListener<R> listener,
- ExecutorService executorService) {
- return new FutureEventListenerRunnable<R>(executorService, listener);
- }
-
- private final ExecutorService executorService;
- private final FutureEventListener<R> listener;
-
- private FutureEventListenerRunnable(ExecutorService executorService,
- FutureEventListener<R> listener) {
- this.executorService = executorService;
- this.listener = listener;
- }
-
- @Override
- public void onSuccess(final R value) {
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- listener.onSuccess(value);
- }
- });
- }
-
- @Override
- public void onFailure(final Throwable cause) {
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- listener.onFailure(cause);
- }
- });
- }
- }
-
- private static class ListFutureProcessor<T, R>
- extends Function<Throwable, BoxedUnit>
- implements FutureEventListener<R>, Runnable {
-
- private volatile boolean interrupted = false;
- private final Iterator<T> itemsIter;
- private final Function<T, Future<R>> processFunc;
- private final Promise<List<R>> promise;
- private final List<R> results;
- private final ExecutorService callbackExecutor;
-
- ListFutureProcessor(List<T> items,
- Function<T, Future<R>> processFunc,
- ExecutorService callbackExecutor) {
- this.itemsIter = items.iterator();
- this.processFunc = processFunc;
- this.promise = new Promise<List<R>>();
- this.promise.setInterruptHandler(this);
- this.results = new ArrayList<R>();
- this.callbackExecutor = callbackExecutor;
- }
-
- @Override
- public BoxedUnit apply(Throwable cause) {
- interrupted = true;
- return BoxedUnit.UNIT;
- }
-
- @Override
- public void onSuccess(R value) {
- results.add(value);
- if (null == callbackExecutor) {
- run();
- } else {
- callbackExecutor.submit(this);
- }
- }
-
- @Override
- public void onFailure(final Throwable cause) {
- if (null == callbackExecutor) {
- promise.setException(cause);
- } else {
- callbackExecutor.submit(new Runnable() {
- @Override
- public void run() {
- promise.setException(cause);
- }
- });
- }
- }
-
- @Override
- public void run() {
- if (interrupted) {
- logger.debug("ListFutureProcessor is interrupted.");
- return;
- }
- if (!itemsIter.hasNext()) {
- promise.setValue(results);
- return;
- }
- processFunc.apply(itemsIter.next()).addEventListener(this);
- }
- }
-
- /**
- * Process the list of items one by one using the process function <i>processFunc</i>.
- * The process will be stopped immediately if it fails on processing any one.
- *
- * @param collection list of items
- * @param processFunc process function
- * @param callbackExecutor executor to process the item
- * @return future presents the list of processed results
- */
- public static <T, R> Future<List<R>> processList(List<T> collection,
- Function<T, Future<R>> processFunc,
- @Nullable ExecutorService callbackExecutor) {
- ListFutureProcessor<T, R> processor =
- new ListFutureProcessor<T, R>(collection, processFunc, callbackExecutor);
- if (null != callbackExecutor) {
- callbackExecutor.submit(processor);
- } else {
- processor.run();
- }
- return processor.promise;
- }
-
- /**
- * Add a event listener over <i>result</i> for collecting the operation stats.
- *
- * @param result result to listen on
- * @param opStatsLogger stats logger to record operations stats
- * @param stopwatch stop watch to time operation
- * @param <T>
- * @return result after registered the event listener
- */
- public static <T> Future<T> stats(Future<T> result,
- OpStatsLogger opStatsLogger,
- Stopwatch stopwatch) {
- return result.addEventListener(new OpStatsListener<T>(opStatsLogger, stopwatch));
- }
-
- /**
- * Await for the result of the future and thrown bk related exceptions.
- *
- * @param result future to wait for
- * @return the result of future
- * @throws BKException when exceptions are thrown by the future. If there is unkown exceptions
- * thrown from the future, the exceptions will be wrapped into
- * {@link org.apache.bookkeeper.client.BKException.BKUnexpectedConditionException}.
- */
- public static <T> T bkResult(Future<T> result) throws BKException {
- try {
- return Await.result(result);
- } catch (BKException bke) {
- throw bke;
- } catch (InterruptedException ie) {
- throw BKException.create(BKException.Code.InterruptedException);
- } catch (Exception e) {
- logger.warn("Encountered unexpected exception on waiting bookkeeper results : ", e);
- throw BKException.create(BKException.Code.UnexpectedConditionException);
- }
- }
-
- /**
- * Return the bk exception return code for a <i>throwable</i>.
- *
- * @param throwable the cause of the exception
- * @return the bk exception return code. if the exception isn't bk exceptions,
- * it would return {@link BKException.Code#UnexpectedConditionException}.
- */
- public static int bkResultCode(Throwable throwable) {
- if (throwable instanceof BKException) {
- return ((BKException)throwable).getCode();
- }
- return BKException.Code.UnexpectedConditionException;
- }
-
- /**
- * Wait for the result until it completes.
- *
- * @param result result to wait
- * @return the result
- * @throws IOException when encountered exceptions on the result
- */
- public static <T> T result(Future<T> result) throws IOException {
- return result(result, Duration.Top());
- }
-
- /**
- * Wait for the result for a given <i>duration</i>.
- * <p>If the result is not ready within `duration`, an IOException will thrown wrapping with
- * corresponding {@link com.twitter.util.TimeoutException}.
- *
- * @param result result to wait
- * @param duration duration to wait
- * @return the result
- * @throws IOException when encountered exceptions on the result or waiting for the result.
- */
- public static <T> T result(Future<T> result, Duration duration)
- throws IOException {
- try {
- return Await.result(result, duration);
- } catch (KeeperException ke) {
- throw new ZKException("Encountered zookeeper exception on waiting result", ke);
- } catch (BKException bke) {
- throw new BKTransmitException("Encountered bookkeeper exception on waiting result", bke.getCode());
- } catch (IOException ioe) {
- throw ioe;
- } catch (InterruptedException ie) {
- throw new DLInterruptedException("Interrupted on waiting result", ie);
- } catch (Exception e) {
- throw new IOException("Encountered exception on waiting result", e);
- }
- }
-
- /**
- * Wait for the result of a lock operation.
- *
- * @param result result to wait
- * @param lockPath path of the lock
- * @return the result
- * @throws LockingException when encountered exceptions on the result of lock operation
- */
- public static <T> T lockResult(Future<T> result, String lockPath) throws LockingException {
- try {
- return Await.result(result);
- } catch (LockingException le) {
- throw le;
- } catch (Exception e) {
- throw new LockingException(lockPath, "Encountered exception on locking ", e);
- }
- }
-
- /**
- * Convert the <i>throwable</i> to zookeeper related exceptions.
- *
- * @param throwable cause
- * @param path zookeeper path
- * @return zookeeper related exceptions
- */
- public static Throwable zkException(Throwable throwable, String path) {
- if (throwable instanceof KeeperException) {
- return new ZKException("Encountered zookeeper exception on " + path, (KeeperException) throwable);
- } else if (throwable instanceof ZooKeeperClient.ZooKeeperConnectionException) {
- return new ZKException("Encountered zookeeper connection loss on " + path,
- KeeperException.Code.CONNECTIONLOSS);
- } else if (throwable instanceof InterruptedException) {
- return new DLInterruptedException("Interrupted on operating " + path, throwable);
- } else {
- return new UnexpectedException("Encountered unexpected exception on operatiing " + path, throwable);
- }
- }
-
- /**
- * Cancel the future. It would interrupt the future.
- *
- * @param future future to cancel
- */
- public static <T> void cancel(Future<T> future) {
- future.raise(new FutureCancelledException());
- }
-
- /**
- * Raise an exception to the <i>promise</i> within a given <i>timeout</i> period.
- * If the promise has been satisfied before raising, it won't change the state of the promise.
- *
- * @param promise promise to raise exception
- * @param timeout timeout period
- * @param unit timeout period unit
- * @param cause cause to raise
- * @param scheduler scheduler to execute raising exception
- * @param key the submit key used by the scheduler
- * @return the promise applied with the raise logic
- */
- public static <T> Promise<T> within(final Promise<T> promise,
- final long timeout,
- final TimeUnit unit,
- final Throwable cause,
- final OrderedScheduler scheduler,
- final Object key) {
- if (timeout < DistributedLogConstants.FUTURE_TIMEOUT_IMMEDIATE || promise.isDefined()) {
- return promise;
- }
- // schedule a timeout to raise timeout exception
- final java.util.concurrent.ScheduledFuture<?> task = scheduler.schedule(key, new Runnable() {
- @Override
- public void run() {
- if (!promise.isDefined() && FutureUtils.setException(promise, cause)) {
- logger.info("Raise exception", cause);
- }
- }
- }, timeout, unit);
- // when the promise is satisfied, cancel the timeout task
- promise.respond(new AbstractFunction1<Try<T>, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Try<T> value) {
- if (!task.cancel(true)) {
- logger.debug("Failed to cancel the timeout task");
- }
- return BoxedUnit.UNIT;
- }
- });
- return promise;
- }
-
- /**
- * Satisfy the <i>promise</i> with provide value in an ordered scheduler.
- * <p>If the promise was already satisfied, nothing will be changed.
- *
- * @param promise promise to satisfy
- * @param value value to satisfy
- * @param scheduler scheduler to satisfy the promise with provided value
- * @param key the submit key of the ordered scheduler
- */
- public static <T> void setValue(final Promise<T> promise,
- final T value,
- OrderedScheduler scheduler,
- Object key) {
- scheduler.submit(key, new Runnable() {
- @Override
- public void run() {
- setValue(promise, value);
- }
- });
- }
-
- /**
- * Satisfy the <i>promise</i> with provide value.
- * <p>If the promise was already satisfied, nothing will be changed.
- *
- * @param promise promise to satisfy
- * @param value value to satisfy
- * @return true if successfully satisfy the future. false if the promise has been satisfied.
- */
- public static <T> boolean setValue(Promise<T> promise, T value) {
- boolean success = promise.updateIfEmpty(new Return<T>(value));
- if (!success) {
- logger.info("Result set multiple times. Value = '{}', New = 'Return({})'",
- promise.poll(), value);
- }
- return success;
- }
-
- /**
- * Satisfy the <i>promise</i> with provided <i>cause</i> in an ordered scheduler.
- *
- * @param promise promise to satisfy
- * @param throwable cause to satisfy
- * @param scheduler the scheduler to satisfy the promise
- * @param key submit key of the ordered scheduler
- */
- public static <T> void setException(final Promise<T> promise,
- final Throwable cause,
- OrderedScheduler scheduler,
- Object key) {
- scheduler.submit(key, new Runnable() {
- @Override
- public void run() {
- setException(promise, cause);
- }
- });
- }
-
- /**
- * Satisfy the <i>promise</i> with provided <i>cause</i>.
- *
- * @param promise promise to satisfy
- * @param cause cause to satisfy
- * @return true if successfully satisfy the future. false if the promise has been satisfied.
- */
- public static <T> boolean setException(Promise<T> promise, Throwable cause) {
- boolean success = promise.updateIfEmpty(new Throw<T>(cause));
- if (!success) {
- logger.info("Result set multiple times. Value = '{}', New = 'Throw({})'",
- promise.poll(), cause);
- }
- return success;
- }
-
- /**
- * Ignore exception from the <i>future</i>.
- *
- * @param future the original future
- * @return a transformed future ignores exceptions
- */
- public static <T> Promise<Void> ignore(Future<T> future) {
- return ignore(future, null);
- }
-
- /**
- * Ignore exception from the <i>future</i> and log <i>errorMsg</i> on exceptions
- *
- * @param future the original future
- * @param errorMsg the error message to log on exceptions
- * @return a transformed future ignores exceptions
- */
- public static <T> Promise<Void> ignore(Future<T> future, final String errorMsg) {
- final Promise<Void> promise = new Promise<Void>();
- future.addEventListener(new FutureEventListener<T>() {
- @Override
- public void onSuccess(T value) {
- setValue(promise, null);
- }
-
- @Override
- public void onFailure(Throwable cause) {
- if (null != errorMsg) {
- logger.error(errorMsg, cause);
- }
- setValue(promise, null);
- }
- });
- return promise;
- }
-
- /**
- * Create transmit exception from transmit result.
- *
- * @param transmitResult
- * transmit result (basically bk exception code)
- * @return transmit exception
- */
- public static BKTransmitException transmitException(int transmitResult) {
- return new BKTransmitException("Failed to write to bookkeeper; Error is ("
- + transmitResult + ") "
- + BKException.getMessage(transmitResult), transmitResult);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredFuturePool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredFuturePool.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredFuturePool.java
deleted file mode 100644
index e06023e..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredFuturePool.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import com.google.common.base.Stopwatch;
-
-import com.twitter.util.FuturePool;
-import com.twitter.util.FuturePool$;
-import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.Counter;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import scala.runtime.BoxedUnit;
-import scala.Function0;
-
-/**
- * {@link FuturePool} with exposed stats. This class is exposing following stats for helping understanding
- * the healthy of this thread pool executor.
- * <h3>Metrics</h3>
- * Stats are only exposed when <code>traceTaskExecution</code> is true.
- * <ul>
- * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on waiting
- * being executed.
- * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on executing.
- * <li>task_enqueue_time: opstats. measuring the characteristics about the time that tasks spent on submitting.
- * <li>tasks_pending: gauge. how many tasks are pending in this future pool.
- * </ul>
- */
-public class MonitoredFuturePool implements FuturePool {
- static final Logger LOG = LoggerFactory.getLogger(MonitoredFuturePool.class);
-
- private final FuturePool futurePool;
-
- private final StatsLogger statsLogger;
- private final OpStatsLogger taskPendingTime;
- private final OpStatsLogger taskExecutionTime;
- private final OpStatsLogger taskEnqueueTime;
- private final Counter taskPendingCounter;
-
- private final boolean traceTaskExecution;
- private final long traceTaskExecutionWarnTimeUs;
-
- class TimedFunction0<T> extends com.twitter.util.Function0<T> {
- private final Function0<T> function0;
- private Stopwatch pendingStopwatch = Stopwatch.createStarted();
-
- TimedFunction0(Function0<T> function0) {
- this.function0 = function0;
- this.pendingStopwatch = Stopwatch.createStarted();
- }
-
- @Override
- public T apply() {
- taskPendingTime.registerSuccessfulEvent(pendingStopwatch.elapsed(TimeUnit.MICROSECONDS));
- Stopwatch executionStopwatch = Stopwatch.createStarted();
- T result = function0.apply();
- taskExecutionTime.registerSuccessfulEvent(executionStopwatch.elapsed(TimeUnit.MICROSECONDS));
- long elapsed = executionStopwatch.elapsed(TimeUnit.MICROSECONDS);
- if (elapsed > traceTaskExecutionWarnTimeUs) {
- LOG.info("{} took too long {} microseconds", function0.toString(), elapsed);
- }
- return result;
- }
- }
-
- /**
- * Create a future pool with stats exposed.
- *
- * @param futurePool underlying future pool to execute futures
- * @param statsLogger stats logger to receive exposed stats
- * @param traceTaskExecution flag to enable/disable exposing stats about task execution
- * @param traceTaskExecutionWarnTimeUs flag to enable/disable logging slow tasks
- * whose execution time is above this value
- */
- public MonitoredFuturePool(FuturePool futurePool,
- StatsLogger statsLogger,
- boolean traceTaskExecution,
- long traceTaskExecutionWarnTimeUs) {
- this.futurePool = futurePool;
- this.traceTaskExecution = traceTaskExecution;
- this.traceTaskExecutionWarnTimeUs = traceTaskExecutionWarnTimeUs;
- this.statsLogger = statsLogger;
- this.taskPendingTime = statsLogger.getOpStatsLogger("task_pending_time");
- this.taskExecutionTime = statsLogger.getOpStatsLogger("task_execution_time");
- this.taskEnqueueTime = statsLogger.getOpStatsLogger("task_enqueue_time");
- this.taskPendingCounter = statsLogger.getCounter("tasks_pending");
- }
-
- @Override
- public <T> Future<T> apply(Function0<T> function0) {
- if (traceTaskExecution) {
- taskPendingCounter.inc();
- Stopwatch taskEnqueueStopwatch = Stopwatch.createStarted();
- Future<T> futureResult = futurePool.apply(new TimedFunction0<T>(function0));
- taskEnqueueTime.registerSuccessfulEvent(taskEnqueueStopwatch.elapsed(TimeUnit.MICROSECONDS));
- futureResult.ensure(new com.twitter.util.Function0<BoxedUnit>() {
- @Override
- public BoxedUnit apply() {
- taskPendingCounter.dec();
- return null;
- }
- });
- return futureResult;
- } else {
- return futurePool.apply(function0);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
deleted file mode 100644
index 75223f2..0000000
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/MonitoredScheduledThreadPoolExecutor.java
+++ /dev/null
@@ -1,257 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.util;
-
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.MathUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * {@link ScheduledThreadPoolExecutor} with exposed stats. This class is exposing following stats for
- * helping understanding the healthy of this thread pool executor.
- * <h3>Metrics</h3>
- * <ul>
- * <li>pending_tasks: gauge. how many tasks are pending in this executor.
- * <li>completed_tasks: gauge. how many tasks are completed in this executor.
- * <li>total_tasks: gauge. how many tasks are submitted to this executor.
- * <li>task_pending_time: opstats. measuring the characteristics about the time that tasks spent on
- * waiting being executed.
- * <li>task_execution_time: opstats. measuring the characteristics about the time that tasks spent on
- * executing.
- * </ul>
- */
-public class MonitoredScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
- static final Logger LOG = LoggerFactory.getLogger(MonitoredScheduledThreadPoolExecutor.class);
-
- private class TimedRunnable implements Runnable {
-
- final Runnable runnable;
- final long enqueueNanos;
-
- TimedRunnable(Runnable runnable) {
- this.runnable = runnable;
- this.enqueueNanos = MathUtils.nowInNano();
- }
-
- @Override
- public void run() {
- long startNanos = MathUtils.nowInNano();
- long pendingMicros = TimeUnit.NANOSECONDS.toMicros(startNanos - enqueueNanos);
- taskPendingStats.registerSuccessfulEvent(pendingMicros);
- try {
- runnable.run();
- } finally {
- long executionMicros = TimeUnit.NANOSECONDS.toMicros(MathUtils.nowInNano() - startNanos);
- taskExecutionStats.registerSuccessfulEvent(executionMicros);
- }
- }
-
- @Override
- public String toString() {
- return runnable.toString();
- }
-
- @Override
- public int hashCode() {
- return runnable.hashCode();
- }
- }
-
- private class TimedCallable<T> implements Callable<T> {
-
- final Callable<T> task;
- final long enqueueNanos;
-
- TimedCallable(Callable<T> task) {
- this.task = task;
- this.enqueueNanos = MathUtils.nowInNano();
- }
-
- @Override
- public T call() throws Exception {
- long startNanos = MathUtils.nowInNano();
- long pendingMicros = TimeUnit.NANOSECONDS.toMicros(startNanos - enqueueNanos);
- taskPendingStats.registerSuccessfulEvent(pendingMicros);
- try {
- return task.call();
- } finally {
- long executionMicros = TimeUnit.NANOSECONDS.toMicros(MathUtils.nowInNano() - startNanos);
- taskExecutionStats.registerSuccessfulEvent(executionMicros);
- }
- }
- }
-
- protected final boolean traceTaskExecution;
- protected final OpStatsLogger taskExecutionStats;
- protected final OpStatsLogger taskPendingStats;
- protected final StatsLogger statsLogger;
- // Gauges and their labels
- private static final String pendingTasksGaugeLabel = "pending_tasks";
- private final Gauge<Number> pendingTasksGauge;
- private static final String completedTasksGaugeLabel = "completed_tasks";
- protected final Gauge<Number> completedTasksGauge;
- private static final String totalTasksGaugeLabel = "total_tasks";
- protected final Gauge<Number> totalTasksGauge;
-
- public MonitoredScheduledThreadPoolExecutor(int corePoolSize,
- ThreadFactory threadFactory,
- StatsLogger statsLogger,
- boolean traceTaskExecution) {
- super(corePoolSize, threadFactory);
- this.traceTaskExecution = traceTaskExecution;
- this.statsLogger = statsLogger;
- this.taskPendingStats = this.statsLogger.getOpStatsLogger("task_pending_time");
- this.taskExecutionStats = this.statsLogger.getOpStatsLogger("task_execution_time");
- this.pendingTasksGauge = new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return getQueue().size();
- }
- };
- this.completedTasksGauge = new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return getCompletedTaskCount();
- }
- };
- this.totalTasksGauge = new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- return getTaskCount();
- }
- };
-
- // outstanding tasks
- this.statsLogger.registerGauge(pendingTasksGaugeLabel, pendingTasksGauge);
- // completed tasks
- this.statsLogger.registerGauge(completedTasksGaugeLabel, completedTasksGauge);
- // total tasks
- this.statsLogger.registerGauge(totalTasksGaugeLabel, pendingTasksGauge);
- }
-
- private Runnable timedRunnable(Runnable r) {
- return traceTaskExecution ? new TimedRunnable(r) : r;
- }
-
- private <T> Callable<T> timedCallable(Callable<T> task) {
- return traceTaskExecution ? new TimedCallable<T>(task) : task;
- }
-
- @Override
- public Future<?> submit(Runnable task) {
- return super.submit(timedRunnable(task));
- }
-
- @Override
- public <T> Future<T> submit(Runnable task, T result) {
- return super.submit(timedRunnable(task), result);
- }
-
- @Override
- public <T> Future<T> submit(Callable<T> task) {
- return super.submit(timedCallable(task));
- }
-
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- super.afterExecute(r, t);
- Throwable hiddenThrowable = extractThrowable(r);
- if (hiddenThrowable != null)
- logAndHandle(hiddenThrowable, true);
-
- // The executor re-throws exceptions thrown by the task to the uncaught exception handler
- // so we don't need to pass the exception to the handler explicitly
- if (null != t) {
- logAndHandle(t, false);
- }
- }
-
- /**
- * The executor re-throws exceptions thrown by the task to the uncaught exception handler
- * so we only need to do anything if uncaught exception handler has not been se
- */
- private void logAndHandle(Throwable t, boolean passToHandler) {
- if (Thread.getDefaultUncaughtExceptionHandler() == null) {
- LOG.error("Unhandled exception on thread {}", Thread.currentThread().getName(), t);
- }
- else {
- LOG.info("Unhandled exception on thread {}", Thread.currentThread().getName(), t);
- if (passToHandler) {
- Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t);
- }
- }
- }
-
-
- /**
- * Extract the exception (throwable) inside the ScheduledFutureTask
- * @param runnable - The runable that was executed
- * @return exception enclosed in the Runnable if any; null otherwise
- */
- private Throwable extractThrowable(Runnable runnable) {
- // Check for exceptions wrapped by FutureTask.
- // We do this by calling get(), which will cause it to throw any saved exception.
- // Check for isDone to prevent blocking
- if ((runnable instanceof Future<?>) && ((Future<?>) runnable).isDone()) {
- try {
- ((Future<?>) runnable).get();
- } catch (CancellationException e) {
- LOG.debug("Task {} cancelled", runnable, e.getCause());
- } catch (InterruptedException e) {
- LOG.debug("Task {} was interrupted", runnable, e);
- } catch (ExecutionException e) {
- return e.getCause();
- }
- }
-
- return null;
- }
-
- void unregisterGauges() {
- this.statsLogger.unregisterGauge(pendingTasksGaugeLabel, pendingTasksGauge);
- this.statsLogger.unregisterGauge(completedTasksGaugeLabel, completedTasksGauge);
- this.statsLogger.unregisterGauge(totalTasksGaugeLabel, totalTasksGauge);
- }
-
-}