You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:31 UTC
[24/30] incubator-distributedlog git commit: DL-205: Remove
StatusCode dependency on DLException
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/TransactionIdOutOfOrderException.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/TransactionIdOutOfOrderException.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/TransactionIdOutOfOrderException.java
index e16fcab..50c151f 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/TransactionIdOutOfOrderException.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/TransactionIdOutOfOrderException.java
@@ -17,8 +17,6 @@
*/
package org.apache.distributedlog.exceptions;
-import org.apache.distributedlog.thrift.service.StatusCode;
-
/**
* An exception is thrown when a log writer attempts to write a record with out-of-order transaction id.
*/
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnexpectedException.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnexpectedException.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnexpectedException.java
index 637886e..50c21aa 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnexpectedException.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnexpectedException.java
@@ -17,8 +17,6 @@
*/
package org.apache.distributedlog.exceptions;
-import org.apache.distributedlog.thrift.service.StatusCode;
-
/**
* An {@code UnexpectedException} is thrown when encountering unexpected conditions.
*/
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnsupportedMetadataVersionException.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnsupportedMetadataVersionException.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnsupportedMetadataVersionException.java
index 01fab89..7eb6ed5 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnsupportedMetadataVersionException.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/UnsupportedMetadataVersionException.java
@@ -17,8 +17,6 @@
*/
package org.apache.distributedlog.exceptions;
-import org.apache.distributedlog.thrift.service.StatusCode;
-
/**
* Exception is thrown when found unsupported metadata version.
*/
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteCancelledException.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteCancelledException.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteCancelledException.java
index d9001dd..8db1d50 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteCancelledException.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteCancelledException.java
@@ -17,8 +17,6 @@
*/
package org.apache.distributedlog.exceptions;
-import org.apache.distributedlog.thrift.service.StatusCode;
-
/**
* Signals when a write request is cancelled.
*/
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
index 6899dbf..1d9c2a9 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/exceptions/WriteException.java
@@ -17,8 +17,6 @@
*/
package org.apache.distributedlog.exceptions;
-import org.apache.distributedlog.thrift.service.StatusCode;
-
/**
* An exception on writing log records.
*/
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/ProtocolUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/ProtocolUtils.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/ProtocolUtils.java
deleted file mode 100644
index 0a37d3a..0000000
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/ProtocolUtils.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import static com.google.common.base.Charsets.UTF_8;
-
-import org.apache.distributedlog.DLSN;
-import java.util.zip.CRC32;
-
-/**
- * With CRC embedded in the application, we have to keep track of per api crc. Ideally this
- * would be done by thrift.
- */
-public class ProtocolUtils {
-
- // For request payload checksum
- private static final ThreadLocal<CRC32> requestCRC = new ThreadLocal<CRC32>() {
- @Override
- protected CRC32 initialValue() {
- return new CRC32();
- }
- };
-
- /**
- * Generate crc32 for WriteOp.
- */
- public static Long writeOpCRC32(String stream, byte[] payload) {
- CRC32 crc = requestCRC.get();
- try {
- crc.update(stream.getBytes(UTF_8));
- crc.update(payload);
- return crc.getValue();
- } finally {
- crc.reset();
- }
- }
-
- /**
- * Generate crc32 for TruncateOp.
- */
- public static Long truncateOpCRC32(String stream, DLSN dlsn) {
- CRC32 crc = requestCRC.get();
- try {
- crc.update(stream.getBytes(UTF_8));
- crc.update(dlsn.serializeBytes());
- return crc.getValue();
- } finally {
- crc.reset();
- }
- }
-
- /**
- * Generate crc32 for any op which only passes a stream name.
- */
- public static Long streamOpCRC32(String stream) {
- CRC32 crc = requestCRC.get();
- try {
- crc.update(stream.getBytes(UTF_8));
- return crc.getValue();
- } finally {
- crc.reset();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/resources/findbugsExclude.xml b/distributedlog-protocol/src/main/resources/findbugsExclude.xml
index 55e50f6..5e1cd0e 100644
--- a/distributedlog-protocol/src/main/resources/findbugsExclude.xml
+++ b/distributedlog-protocol/src/main/resources/findbugsExclude.xml
@@ -17,10 +17,6 @@
//-->
<FindBugsFilter>
<Match>
- <!-- generated code, we can't be held responsible for findbugs in it //-->
- <Class name="~org\.apache\.distributedlog\.thrift.*" />
- </Match>
- <Match>
<!-- it is safe to store external bytes reference here. //-->
<Class name="org.apache.distributedlog.LogRecord" />
<Bug pattern="EI_EXPOSE_REP2" />
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-protocol/src/main/thrift/service.thrift
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/thrift/service.thrift b/distributedlog-protocol/src/main/thrift/service.thrift
deleted file mode 100644
index 45e1449..0000000
--- a/distributedlog-protocol/src/main/thrift/service.thrift
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-namespace java org.apache.distributedlog.thrift.service
-
-/* Response stats codes */
-enum StatusCode {
- /* 2xx: action requested by the client was received, understood, accepted and processed successfully. */
-
- /* standard response for successful requests. */
- SUCCESS = 200,
-
- /* 3xx: client must take additional action to complete the request. */
-
- /* client closed. */
- CLIENT_CLOSED = 301,
- /* found the stream in a different server, a redirection is required by client. */
- FOUND = 302,
-
- /* 4xx: client seems to have erred. */
-
- /* request is denied for some reason */
- REQUEST_DENIED = 403,
- /* request record too large */
- TOO_LARGE_RECORD = 413,
-
- /* 5xx: server failed to fulfill an apparently valid request. */
-
- /* Generic error message, given when no more specific message is suitable. */
- INTERNAL_SERVER_ERROR = 500,
- /* Not implemented */
- NOT_IMPLEMENTED = 501,
- /* Already Closed Exception */
- ALREADY_CLOSED = 502,
- /* Service is currently unavailable (because it is overloaded or down for maintenance). */
- SERVICE_UNAVAILABLE = 503,
- /* Locking exception */
- LOCKING_EXCEPTION = 504,
- /* ZooKeeper Errors */
- ZOOKEEPER_ERROR = 505,
- /* Metadata exception */
- METADATA_EXCEPTION = 506,
- /* BK Transmit Error */
- BK_TRANSMIT_ERROR = 507,
- /* Flush timeout */
- FLUSH_TIMEOUT = 508,
- /* Log empty */
- LOG_EMPTY = 509,
- /* Log not found */
- LOG_NOT_FOUND = 510,
- /* Truncated Transactions */
- TRUNCATED_TRANSACTION = 511,
- /* End of Stream */
- END_OF_STREAM = 512,
- /* Transaction Id Out of Order */
- TRANSACTION_OUT_OF_ORDER = 513,
- /* Write exception */
- WRITE_EXCEPTION = 514,
- /* Stream Unavailable */
- STREAM_UNAVAILABLE = 515,
- /* Write cancelled exception */
- WRITE_CANCELLED_EXCEPTION = 516,
- /* over-capacity/backpressure */
- OVER_CAPACITY = 517,
- /** stream exists but is not ready (recovering etc.).
- the difference between NOT_READY and UNAVAILABLE is that UNAVAILABLE
- indicates the stream is no longer owned by the proxy and we should
- redirect. NOT_READY indicates the stream exist at the proxy but isn't
- eady for writes. */
- STREAM_NOT_READY = 518,
- /* Region Unavailable */
- REGION_UNAVAILABLE = 519,
- /* Invalid Enveloped Entry */
- INVALID_ENVELOPED_ENTRY = 520,
- /* Unsupported metadata version */
- UNSUPPORTED_METADATA_VERSION = 521,
- /* Log Already Exists */
- LOG_EXISTS = 522,
- /* Checksum failed on the request */
- CHECKSUM_FAILED = 523,
- /* Overcapacity: too many streams */
- TOO_MANY_STREAMS = 524,
- /* Log Segment Not Found */
- LOG_SEGMENT_NOT_FOUND = 525,
- /* End of Log Segment */
- END_OF_LOG_SEGMENT = 526,
- /* Log Segment Is Truncated */
- LOG_SEGMENT_IS_TRUNCATED = 527,
-
- /* 6xx: unexpected */
-
- UNEXPECTED = 600,
- INTERRUPTED = 601,
- INVALID_STREAM_NAME = 602,
- ILLEGAL_STATE = 603,
-
- /* 10xx: reader exceptions */
-
- RETRYABLE_READ = 1000,
- LOG_READ_ERROR = 1001,
- /* Read cancelled exception */
- READ_CANCELLED_EXCEPTION = 1002,
-}
-
-/* Response Header */
-struct ResponseHeader {
- 1: required StatusCode code;
- 2: optional string errMsg;
- 3: optional string location;
-}
-
-/* Write Response */
-struct WriteResponse {
- 1: required ResponseHeader header;
- 2: optional string dlsn;
-}
-
-/* Bulk write response */
-struct BulkWriteResponse {
- 1: required ResponseHeader header;
- 2: optional list<WriteResponse> writeResponses;
-}
-
-/* Write Context */
-struct WriteContext {
- 1: optional set<string> triedHosts;
- 2: optional i64 crc32;
- 3: optional bool isRecordSet;
-}
-
-/* HeartBeat Options */
-struct HeartbeatOptions {
- 1: optional bool sendHeartBeatToReader;
-}
-
-/* Server Status */
-enum ServerStatus {
- /* service is writing and accepting new streams */
- WRITE_AND_ACCEPT = 100,
- /* service is only writing to old streams, not accepting new streams */
- WRITE_ONLY = 200,
- /* service is shutting down, will not write */
- DOWN = 300,
-}
-
-/* Server Info */
-struct ServerInfo {
- 1: optional map<string, string> ownerships;
- 2: optional ServerStatus serverStatus;
-}
-
-/* Client Info */
-struct ClientInfo {
- 1: optional string streamNameRegex;
- 2: optional bool getOwnerships;
-}
-
-service DistributedLogService {
-
- /* Deprecated */
- ServerInfo handshake();
-
- ServerInfo handshakeWithClientInfo(ClientInfo clientInfo);
-
- /* Deprecated */
- WriteResponse heartbeat(string stream, WriteContext ctx);
-
- WriteResponse heartbeatWithOptions(string stream, WriteContext ctx, HeartbeatOptions options);
-
- /* Deprecated */
- WriteResponse write(string stream, binary data);
-
- WriteResponse writeWithContext(string stream, binary data, WriteContext ctx);
-
- BulkWriteResponse writeBulkWithContext(string stream, list<binary> data, WriteContext ctx);
-
- WriteResponse truncate(string stream, string dlsn, WriteContext ctx);
-
- WriteResponse release(string stream, WriteContext ctx);
-
- WriteResponse create(string stream, WriteContext ctx);
-
- WriteResponse delete(string stream, WriteContext ctx);
-
- WriteResponse getOwner(string stream, WriteContext ctx);
-
- /* Admin Methods */
- void setAcceptNewStream(bool enabled);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/pom.xml b/distributedlog-proxy-client/pom.xml
new file mode 100644
index 0000000..7392d90
--- /dev/null
+++ b/distributedlog-proxy-client/pom.xml
@@ -0,0 +1,172 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.distributedlog</groupId>
+ <artifactId>distributedlog</artifactId>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ </parent>
+ <artifactId>distributedlog-proxy-client</artifactId>
+ <name>Apache DistributedLog :: Proxy Client</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.distributedlog</groupId>
+ <artifactId>distributedlog-proxy-protocol</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>finagle-core_2.11</artifactId>
+ <version>${finagle.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>finagle-thriftmux_2.11</artifactId>
+ <version>${finagle.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>finagle-serversets_2.11</artifactId>
+ <version>${finagle.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons-lang3.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.distributedlog</groupId>
+ <artifactId>distributedlog-protocol</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <configuration>
+ <excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>${maven-compiler-plugin.version}</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>${maven-jar-plugin.version}</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${maven-surefire-plugin.version}</version>
+ <configuration>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=2G</argLine>
+ <forkMode>always</forkMode>
+ <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
+ <properties>
+ <property>
+ <name>listener</name>
+ <value>org.apache.distributedlog.TimedOutTestsListener</value>
+ </property>
+ </properties>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>${maven-checkstyle-plugin.version}</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>${puppycrawl.checkstyle.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.distributedlog</groupId>
+ <artifactId>distributedlog-build-tools</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <configLocation>distributedlog/checkstyle.xml</configLocation>
+ <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation>
+ <consoleOutput>true</consoleOutput>
+ <failOnViolation>true</failOnViolation>
+ <includeResources>false</includeResources>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java
new file mode 100644
index 0000000..57e2b5a
--- /dev/null
+++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/ClientConfig.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Client Config.
+ */
+public class ClientConfig {
+ int redirectBackoffStartMs = 25;
+ int redirectBackoffMaxMs = 100;
+ int maxRedirects = -1;
+ int requestTimeoutMs = -1;
+ boolean thriftmux = false;
+ boolean streamFailfast = false;
+ String streamNameRegex = ".*";
+ boolean handshakeWithClientInfo = true;
+ long periodicHandshakeIntervalMs = TimeUnit.MINUTES.toMillis(5);
+ long periodicOwnershipSyncIntervalMs = TimeUnit.MINUTES.toMillis(5);
+ boolean periodicDumpOwnershipCacheEnabled = false;
+ long periodicDumpOwnershipCacheIntervalMs = TimeUnit.MINUTES.toMillis(10);
+ boolean enableHandshakeTracing = false;
+ boolean enableChecksum = true;
+
+ public ClientConfig setMaxRedirects(int maxRedirects) {
+ this.maxRedirects = maxRedirects;
+ return this;
+ }
+
+ public int getMaxRedirects() {
+ return this.maxRedirects;
+ }
+
+ public ClientConfig setRequestTimeoutMs(int timeoutInMillis) {
+ this.requestTimeoutMs = timeoutInMillis;
+ return this;
+ }
+
+ public int getRequestTimeoutMs() {
+ return this.requestTimeoutMs;
+ }
+
+ public ClientConfig setRedirectBackoffStartMs(int ms) {
+ this.redirectBackoffStartMs = ms;
+ return this;
+ }
+
+ public int getRedirectBackoffStartMs() {
+ return this.redirectBackoffStartMs;
+ }
+
+ public ClientConfig setRedirectBackoffMaxMs(int ms) {
+ this.redirectBackoffMaxMs = ms;
+ return this;
+ }
+
+ public int getRedirectBackoffMaxMs() {
+ return this.redirectBackoffMaxMs;
+ }
+
+ public ClientConfig setThriftMux(boolean enabled) {
+ this.thriftmux = enabled;
+ return this;
+ }
+
+ public boolean getThriftMux() {
+ return this.thriftmux;
+ }
+
+ public ClientConfig setStreamFailfast(boolean enabled) {
+ this.streamFailfast = enabled;
+ return this;
+ }
+
+ public boolean getStreamFailfast() {
+ return this.streamFailfast;
+ }
+
+ public ClientConfig setStreamNameRegex(String nameRegex) {
+ checkNotNull(nameRegex);
+ this.streamNameRegex = nameRegex;
+ return this;
+ }
+
+ public String getStreamNameRegex() {
+ return this.streamNameRegex;
+ }
+
+ public ClientConfig setHandshakeWithClientInfo(boolean enabled) {
+ this.handshakeWithClientInfo = enabled;
+ return this;
+ }
+
+ public boolean getHandshakeWithClientInfo() {
+ return this.handshakeWithClientInfo;
+ }
+
+ public ClientConfig setPeriodicHandshakeIntervalMs(long intervalMs) {
+ this.periodicHandshakeIntervalMs = intervalMs;
+ return this;
+ }
+
+ public long getPeriodicHandshakeIntervalMs() {
+ return this.periodicHandshakeIntervalMs;
+ }
+
+ public ClientConfig setPeriodicOwnershipSyncIntervalMs(long intervalMs) {
+ this.periodicOwnershipSyncIntervalMs = intervalMs;
+ return this;
+ }
+
+ public long getPeriodicOwnershipSyncIntervalMs() {
+ return this.periodicOwnershipSyncIntervalMs;
+ }
+
+ public ClientConfig setPeriodicDumpOwnershipCacheEnabled(boolean enabled) {
+ this.periodicDumpOwnershipCacheEnabled = enabled;
+ return this;
+ }
+
+ public boolean isPeriodicDumpOwnershipCacheEnabled() {
+ return this.periodicDumpOwnershipCacheEnabled;
+ }
+
+ public ClientConfig setPeriodicDumpOwnershipCacheIntervalMs(long intervalMs) {
+ this.periodicDumpOwnershipCacheIntervalMs = intervalMs;
+ return this;
+ }
+
+ public long getPeriodicDumpOwnershipCacheIntervalMs() {
+ return this.periodicDumpOwnershipCacheIntervalMs;
+ }
+
+ public ClientConfig setHandshakeTracingEnabled(boolean enabled) {
+ this.enableHandshakeTracing = enabled;
+ return this;
+ }
+
+ public boolean isHandshakeTracingEnabled() {
+ return this.enableHandshakeTracing;
+ }
+
+ public ClientConfig setChecksumEnabled(boolean enabled) {
+ this.enableChecksum = enabled;
+ return this;
+ }
+
+ public boolean isChecksumEnabled() {
+ return this.enableChecksum;
+ }
+
+ public static ClientConfig newConfig(ClientConfig config) {
+ ClientConfig newConfig = new ClientConfig();
+ newConfig.setMaxRedirects(config.getMaxRedirects())
+ .setRequestTimeoutMs(config.getRequestTimeoutMs())
+ .setRedirectBackoffStartMs(config.getRedirectBackoffStartMs())
+ .setRedirectBackoffMaxMs(config.getRedirectBackoffMaxMs())
+ .setThriftMux(config.getThriftMux())
+ .setStreamFailfast(config.getStreamFailfast())
+ .setStreamNameRegex(config.getStreamNameRegex())
+ .setHandshakeWithClientInfo(config.getHandshakeWithClientInfo())
+ .setPeriodicHandshakeIntervalMs(config.getPeriodicHandshakeIntervalMs())
+ .setPeriodicDumpOwnershipCacheEnabled(config.isPeriodicDumpOwnershipCacheEnabled())
+ .setPeriodicDumpOwnershipCacheIntervalMs(config.getPeriodicDumpOwnershipCacheIntervalMs())
+ .setHandshakeTracingEnabled(config.isHandshakeTracingEnabled())
+ .setChecksumEnabled(config.isChecksumEnabled());
+ return newConfig;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
new file mode 100644
index 0000000..0ed93d0
--- /dev/null
+++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogClientImpl.java
@@ -0,0 +1,1199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.LogRecordSetBuffer;
+import org.apache.distributedlog.client.monitor.MonitorServiceClient;
+import org.apache.distributedlog.client.ownership.OwnershipCache;
+import org.apache.distributedlog.client.proxy.ClusterClient;
+import org.apache.distributedlog.client.proxy.HostProvider;
+import org.apache.distributedlog.client.proxy.ProxyClient;
+import org.apache.distributedlog.client.proxy.ProxyClientManager;
+import org.apache.distributedlog.client.proxy.ProxyListener;
+import org.apache.distributedlog.client.resolver.RegionResolver;
+import org.apache.distributedlog.client.routing.RoutingService;
+import org.apache.distributedlog.client.routing.RoutingService.RoutingContext;
+import org.apache.distributedlog.client.stats.ClientStats;
+import org.apache.distributedlog.client.stats.OpStats;
+import org.apache.distributedlog.exceptions.DLClientClosedException;
+import org.apache.distributedlog.exceptions.ServiceUnavailableException;
+import org.apache.distributedlog.exceptions.StreamUnavailableException;
+import org.apache.distributedlog.protocol.util.ProtocolUtils;
+import org.apache.distributedlog.service.DLSocketAddress;
+import org.apache.distributedlog.service.DistributedLogClient;
+import org.apache.distributedlog.thrift.service.BulkWriteResponse;
+import org.apache.distributedlog.thrift.service.HeartbeatOptions;
+import org.apache.distributedlog.thrift.service.ResponseHeader;
+import org.apache.distributedlog.thrift.service.ServerInfo;
+import org.apache.distributedlog.thrift.service.ServerStatus;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import org.apache.distributedlog.thrift.service.WriteContext;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import com.twitter.finagle.CancelledRequestException;
+import com.twitter.finagle.ConnectionFailedException;
+import com.twitter.finagle.Failure;
+import com.twitter.finagle.NoBrokersAvailableException;
+import com.twitter.finagle.RequestTimeoutException;
+import com.twitter.finagle.ServiceException;
+import com.twitter.finagle.ServiceTimeoutException;
+import com.twitter.finagle.WriteException;
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.stats.StatsReceiver;
+import com.twitter.finagle.thrift.ClientId;
+import com.twitter.util.Duration;
+import com.twitter.util.Function;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import com.twitter.util.Return;
+import com.twitter.util.Throw;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.thrift.TApplicationException;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.Seq;
+import scala.runtime.AbstractFunction1;
+
+
+/**
+ * Implementation of distributedlog client.
+ */
+public class DistributedLogClientImpl implements DistributedLogClient, MonitorServiceClient,
+ RoutingService.RoutingListener, ProxyListener, HostProvider {
+
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientImpl.class);
+
+ private final String clientName;
+ private final ClientId clientId;
+ private final ClientConfig clientConfig;
+ private final RoutingService routingService;
+ private final ProxyClient.Builder clientBuilder;
+ private final boolean streamFailfast;
+ private final Pattern streamNameRegexPattern;
+
+ // Timer
+ private final HashedWheelTimer dlTimer;
+
+ // region resolver
+ private final RegionResolver regionResolver;
+
+ // Ownership maintenance
+ private final OwnershipCache ownershipCache;
+ // Channel/Client management
+ private final ProxyClientManager clientManager;
+ // Cluster Client (for routing service)
+ private final Optional<ClusterClient> clusterClient;
+
+ // Close Status
+ private boolean closed = false;
+ private final ReentrantReadWriteLock closeLock =
+ new ReentrantReadWriteLock();
+
+ abstract class StreamOp implements TimerTask {
+ final String stream;
+
+ final AtomicInteger tries = new AtomicInteger(0);
+ final RoutingContext routingContext = RoutingContext.of(regionResolver);
+ final WriteContext ctx = new WriteContext();
+ final Stopwatch stopwatch;
+ final OpStats opStats;
+ SocketAddress nextAddressToSend;
+
+ StreamOp(final String stream, final OpStats opStats) {
+ this.stream = stream;
+ this.stopwatch = Stopwatch.createStarted();
+ this.opStats = opStats;
+ }
+
+ boolean shouldTimeout() {
+ long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ return shouldTimeout(elapsedMs);
+ }
+
+ boolean shouldTimeout(long elapsedMs) {
+ return clientConfig.getRequestTimeoutMs() > 0
+ && elapsedMs >= clientConfig.getRequestTimeoutMs();
+ }
+
+ void send(SocketAddress address) {
+ long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ if (clientConfig.getMaxRedirects() > 0
+ && tries.get() >= clientConfig.getMaxRedirects()) {
+ fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs),
+ "Exhausted max redirects in " + elapsedMs + " ms"));
+ return;
+ } else if (shouldTimeout(elapsedMs)) {
+ fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs),
+ "Exhausted max request timeout " + clientConfig.getRequestTimeoutMs()
+ + " in " + elapsedMs + " ms"));
+ return;
+ }
+ synchronized (this) {
+ String addrStr = address.toString();
+ if (ctx.isSetTriedHosts() && ctx.getTriedHosts().contains(addrStr)) {
+ nextAddressToSend = address;
+ dlTimer.newTimeout(this,
+ Math.min(clientConfig.getRedirectBackoffMaxMs(),
+ tries.get() * clientConfig.getRedirectBackoffStartMs()),
+ TimeUnit.MILLISECONDS);
+ } else {
+ doSend(address);
+ }
+ }
+ }
+
+ abstract Future<ResponseHeader> sendRequest(ProxyClient sc);
+
+ void doSend(SocketAddress address) {
+ ctx.addToTriedHosts(address.toString());
+ if (clientConfig.isChecksumEnabled()) {
+ Long crc32 = computeChecksum();
+ if (null != crc32) {
+ ctx.setCrc32(crc32);
+ }
+ }
+ tries.incrementAndGet();
+ sendWriteRequest(address, this);
+ }
+
+ void beforeComplete(ProxyClient sc, ResponseHeader responseHeader) {
+ ownershipCache.updateOwner(stream, sc.getAddress());
+ }
+
+ void complete(SocketAddress address) {
+ stopwatch.stop();
+ opStats.completeRequest(address,
+ stopwatch.elapsed(TimeUnit.MICROSECONDS), tries.get());
+ }
+
+ void fail(SocketAddress address, Throwable t) {
+ stopwatch.stop();
+ opStats.failRequest(address,
+ stopwatch.elapsed(TimeUnit.MICROSECONDS), tries.get());
+ }
+
+ Long computeChecksum() {
+ return null;
+ }
+
+ @Override
+ public synchronized void run(Timeout timeout) throws Exception {
+ if (!timeout.isCancelled() && null != nextAddressToSend) {
+ doSend(nextAddressToSend);
+ } else {
+ fail(null, new CancelledRequestException());
+ }
+ }
+ }
+
+ class BulkWriteOp extends StreamOp {
+
+ final List<ByteBuffer> data;
+ final ArrayList<Promise<DLSN>> results;
+
+ BulkWriteOp(final String name, final List<ByteBuffer> data) {
+ super(name, clientStats.getOpStats("bulk_write"));
+ this.data = data;
+
+ // This could take a while (relatively speaking) for very large inputs. We probably don't want
+ // to go so large for other reasons though.
+ this.results = new ArrayList<Promise<DLSN>>(data.size());
+ for (int i = 0; i < data.size(); i++) {
+ checkNotNull(data.get(i));
+ this.results.add(new Promise<DLSN>());
+ }
+ }
+
+ @Override
+ Future<ResponseHeader> sendRequest(final ProxyClient sc) {
+ return sc.getService().writeBulkWithContext(stream, data, ctx)
+ .addEventListener(new FutureEventListener<BulkWriteResponse>() {
+ @Override
+ public void onSuccess(BulkWriteResponse response) {
+ // For non-success case, the ResponseHeader handler (the caller) will handle it.
+ // Note success in this case means no finagle errors have occurred
+ // (such as finagle connection issues). In general code != SUCCESS means there's some error
+ // reported by dlog service. The caller will handle such errors.
+ if (response.getHeader().getCode() == StatusCode.SUCCESS) {
+ beforeComplete(sc, response.getHeader());
+ BulkWriteOp.this.complete(sc.getAddress(), response);
+ if (response.getWriteResponses().size() == 0 && data.size() > 0) {
+ logger.error("non-empty bulk write got back empty response without failure for stream {}",
+ stream);
+ }
+ }
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ // Handled by the ResponseHeader listener (attached by the caller).
+ }
+ }).map(new AbstractFunction1<BulkWriteResponse, ResponseHeader>() {
+ @Override
+ public ResponseHeader apply(BulkWriteResponse response) {
+ // We need to return the ResponseHeader to the caller's listener to process DLOG errors.
+ return response.getHeader();
+ }
+ });
+ }
+
+ void complete(SocketAddress address, BulkWriteResponse bulkWriteResponse) {
+ super.complete(address);
+ Iterator<WriteResponse> writeResponseIterator = bulkWriteResponse.getWriteResponses().iterator();
+ Iterator<Promise<DLSN>> resultIterator = results.iterator();
+
+ // Fill in errors from thrift responses.
+ while (resultIterator.hasNext() && writeResponseIterator.hasNext()) {
+ Promise<DLSN> result = resultIterator.next();
+ WriteResponse writeResponse = writeResponseIterator.next();
+ if (StatusCode.SUCCESS == writeResponse.getHeader().getCode()) {
+ result.setValue(DLSN.deserialize(writeResponse.getDlsn()));
+ } else {
+ result.setException(ProtocolUtils.exception(writeResponse.getHeader()));
+ }
+ }
+
+ // Should never happen, but just in case so there's some record.
+ if (bulkWriteResponse.getWriteResponses().size() != data.size()) {
+ logger.error("wrong number of results, response = {} records = {}",
+ bulkWriteResponse.getWriteResponses().size(), data.size());
+ }
+ }
+
+ @Override
+ void fail(SocketAddress address, Throwable t) {
+
+ // StreamOp.fail is called to fail the overall request. In case of BulkWriteOp we take the request level
+ // exception to apply to the first write. In fact for request level exceptions no request has ever been
+ // attempted, but logically we associate the error with the first write.
+ super.fail(address, t);
+ Iterator<Promise<DLSN>> resultIterator = results.iterator();
+
+ // Fail the first write with the batch level failure.
+ if (resultIterator.hasNext()) {
+ Promise<DLSN> result = resultIterator.next();
+ result.setException(t);
+ }
+
+ // Fail the remaining writes as cancelled requests.
+ while (resultIterator.hasNext()) {
+ Promise<DLSN> result = resultIterator.next();
+ result.setException(new CancelledRequestException());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ List<Future<DLSN>> result() {
+ return (List) results;
+ }
+ }
+
+ abstract class AbstractWriteOp extends StreamOp {
+
+ final Promise<WriteResponse> result = new Promise<WriteResponse>();
+ Long crc32 = null;
+
+ AbstractWriteOp(final String name, final OpStats opStats) {
+ super(name, opStats);
+ }
+
+ void complete(SocketAddress address, WriteResponse response) {
+ super.complete(address);
+ result.setValue(response);
+ }
+
+ @Override
+ void fail(SocketAddress address, Throwable t) {
+ super.fail(address, t);
+ result.setException(t);
+ }
+
+ @Override
+ Long computeChecksum() {
+ if (null == crc32) {
+ crc32 = ProtocolUtils.streamOpCRC32(stream);
+ }
+ return crc32;
+ }
+
+ @Override
+ Future<ResponseHeader> sendRequest(final ProxyClient sc) {
+ return this.sendWriteRequest(sc).addEventListener(new FutureEventListener<WriteResponse>() {
+ @Override
+ public void onSuccess(WriteResponse response) {
+ if (response.getHeader().getCode() == StatusCode.SUCCESS) {
+ beforeComplete(sc, response.getHeader());
+ AbstractWriteOp.this.complete(sc.getAddress(), response);
+ }
+ }
+ @Override
+ public void onFailure(Throwable cause) {
+ // handled by the ResponseHeader listener
+ }
+ }).map(new AbstractFunction1<WriteResponse, ResponseHeader>() {
+ @Override
+ public ResponseHeader apply(WriteResponse response) {
+ return response.getHeader();
+ }
+ });
+ }
+
+ abstract Future<WriteResponse> sendWriteRequest(ProxyClient sc);
+ }
+
+ class WriteOp extends AbstractWriteOp {
+ final ByteBuffer data;
+
+ WriteOp(final String name, final ByteBuffer data) {
+ super(name, clientStats.getOpStats("write"));
+ this.data = data;
+ }
+
+ @Override
+ Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
+ return sc.getService().writeWithContext(stream, data, ctx);
+ }
+
+ @Override
+ Long computeChecksum() {
+ if (null == crc32) {
+ byte[] dataBytes = new byte[data.remaining()];
+ data.duplicate().get(dataBytes);
+ crc32 = ProtocolUtils.writeOpCRC32(stream, dataBytes);
+ }
+ return crc32;
+ }
+
+ Future<DLSN> result() {
+ return result.map(new AbstractFunction1<WriteResponse, DLSN>() {
+ @Override
+ public DLSN apply(WriteResponse response) {
+ return DLSN.deserialize(response.getDlsn());
+ }
+ });
+ }
+ }
+
+ class TruncateOp extends AbstractWriteOp {
+ final DLSN dlsn;
+
+ TruncateOp(String name, DLSN dlsn) {
+ super(name, clientStats.getOpStats("truncate"));
+ this.dlsn = dlsn;
+ }
+
+ @Override
+ Long computeChecksum() {
+ if (null == crc32) {
+ crc32 = ProtocolUtils.truncateOpCRC32(stream, dlsn);
+ }
+ return crc32;
+ }
+
+ @Override
+ Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
+ return sc.getService().truncate(stream, dlsn.serialize(), ctx);
+ }
+
+ Future<Boolean> result() {
+ return result.map(new AbstractFunction1<WriteResponse, Boolean>() {
+ @Override
+ public Boolean apply(WriteResponse response) {
+ return true;
+ }
+ });
+ }
+ }
+
+ class WriteRecordSetOp extends WriteOp {
+
+ WriteRecordSetOp(String name, LogRecordSetBuffer recordSet) {
+ super(name, recordSet.getBuffer());
+ ctx.setIsRecordSet(true);
+ }
+
+ }
+
+
+ class ReleaseOp extends AbstractWriteOp {
+
+ ReleaseOp(String name) {
+ super(name, clientStats.getOpStats("release"));
+ }
+
+ @Override
+ Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
+ return sc.getService().release(stream, ctx);
+ }
+
+ @Override
+ void beforeComplete(ProxyClient sc, ResponseHeader header) {
+ ownershipCache.removeOwnerFromStream(stream, sc.getAddress(), "Stream Deleted");
+ }
+
+ Future<Void> result() {
+ return result.map(new AbstractFunction1<WriteResponse, Void>() {
+ @Override
+ public Void apply(WriteResponse response) {
+ return null;
+ }
+ });
+ }
+ }
+
+ class DeleteOp extends AbstractWriteOp {
+
+ DeleteOp(String name) {
+ super(name, clientStats.getOpStats("delete"));
+ }
+
+ @Override
+ Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
+ return sc.getService().delete(stream, ctx);
+ }
+
+ @Override
+ void beforeComplete(ProxyClient sc, ResponseHeader header) {
+ ownershipCache.removeOwnerFromStream(stream, sc.getAddress(), "Stream Deleted");
+ }
+
+ Future<Void> result() {
+ return result.map(new AbstractFunction1<WriteResponse, Void>() {
+ @Override
+ public Void apply(WriteResponse v1) {
+ return null;
+ }
+ });
+ }
+ }
+
+ class CreateOp extends AbstractWriteOp {
+
+ CreateOp(String name) {
+ super(name, clientStats.getOpStats("create"));
+ }
+
+ @Override
+ Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
+ return sc.getService().create(stream, ctx);
+ }
+
+ @Override
+ void beforeComplete(ProxyClient sc, ResponseHeader header) {
+ ownershipCache.updateOwner(stream, sc.getAddress());
+ }
+
+ Future<Void> result() {
+ return result.map(new AbstractFunction1<WriteResponse, Void>() {
+ @Override
+ public Void apply(WriteResponse v1) {
+ return null;
+ }
+ }).voided();
+ }
+ }
+
+ class HeartbeatOp extends AbstractWriteOp {
+ HeartbeatOptions options;
+
+ HeartbeatOp(String name, boolean sendReaderHeartBeat) {
+ super(name, clientStats.getOpStats("heartbeat"));
+ options = new HeartbeatOptions();
+ options.setSendHeartBeatToReader(sendReaderHeartBeat);
+ }
+
+ @Override
+ Future<WriteResponse> sendWriteRequest(ProxyClient sc) {
+ return sc.getService().heartbeatWithOptions(stream, ctx, options);
+ }
+
+ Future<Void> result() {
+ return result.map(new AbstractFunction1<WriteResponse, Void>() {
+ @Override
+ public Void apply(WriteResponse response) {
+ return null;
+ }
+ });
+ }
+ }
+
+ // Stats
+ private final ClientStats clientStats;
+
+ public DistributedLogClientImpl(String name,
+ ClientId clientId,
+ RoutingService routingService,
+ ClientBuilder clientBuilder,
+ ClientConfig clientConfig,
+ Optional<ClusterClient> clusterClient,
+ StatsReceiver statsReceiver,
+ StatsReceiver streamStatsReceiver,
+ RegionResolver regionResolver,
+ boolean enableRegionStats) {
+ this.clientName = name;
+ this.clientId = clientId;
+ this.routingService = routingService;
+ this.clientConfig = clientConfig;
+ this.streamFailfast = clientConfig.getStreamFailfast();
+ this.streamNameRegexPattern = Pattern.compile(clientConfig.getStreamNameRegex());
+ this.regionResolver = regionResolver;
+ // Build the timer
+ this.dlTimer = new HashedWheelTimer(
+ new ThreadFactoryBuilder().setNameFormat("DLClient-" + name + "-timer-%d").build(),
+ this.clientConfig.getRedirectBackoffStartMs(),
+ TimeUnit.MILLISECONDS);
+ // register routing listener
+ this.routingService.registerListener(this);
+ // build the ownership cache
+ this.ownershipCache = new OwnershipCache(this.clientConfig, this.dlTimer, statsReceiver, streamStatsReceiver);
+ // Client Stats
+ this.clientStats = new ClientStats(statsReceiver, enableRegionStats, regionResolver);
+ // Client Manager
+ this.clientBuilder = ProxyClient.newBuilder(clientName, clientId, clientBuilder, clientConfig, clientStats);
+ this.clientManager = new ProxyClientManager(
+ this.clientConfig, // client config
+ this.clientBuilder, // client builder
+ this.dlTimer, // timer
+ this, // host provider
+ clientStats); // client stats
+ this.clusterClient = clusterClient;
+ this.clientManager.registerProxyListener(this);
+
+ // Cache Stats
+ StatsReceiver cacheStatReceiver = statsReceiver.scope("cache");
+ Seq<String> numCachedStreamsGaugeName =
+ scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("num_streams")).toList();
+ cacheStatReceiver.provideGauge(numCachedStreamsGaugeName, new Function0<Object>() {
+ @Override
+ public Object apply() {
+ return (float) ownershipCache.getNumCachedStreams();
+ }
+ });
+ Seq<String> numCachedHostsGaugeName =
+ scala.collection.JavaConversions.asScalaBuffer(Arrays.asList("num_hosts")).toList();
+ cacheStatReceiver.provideGauge(numCachedHostsGaugeName, new Function0<Object>() {
+ @Override
+ public Object apply() {
+ return (float) clientManager.getNumProxies();
+ }
+ });
+
+ logger.info("Build distributedlog client : name = {}, client_id = {}, routing_service = {},"
+ + " stats_receiver = {}, thriftmux = {}",
+ new Object[] {
+ name,
+ clientId,
+ routingService.getClass(),
+ statsReceiver.getClass(),
+ clientConfig.getThriftMux()
+ });
+ }
+
+ @Override
+ public Set<SocketAddress> getHosts() {
+ Set<SocketAddress> hosts = Sets.newHashSet();
+ // if using server side routing, we only handshake with the hosts in ownership cache.
+ if (!clusterClient.isPresent()) {
+ hosts.addAll(this.routingService.getHosts());
+ }
+ hosts.addAll(this.ownershipCache.getStreamOwnershipDistribution().keySet());
+ return hosts;
+ }
+
+ @Override
+ public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
+ if (null != serverInfo
+ && serverInfo.isSetServerStatus()
+ && ServerStatus.DOWN == serverInfo.getServerStatus()) {
+ logger.info("{} is detected as DOWN during handshaking", address);
+ // server is shutting down
+ handleServiceUnavailable(address, client, Optional.<StreamOp>absent());
+ return;
+ }
+
+ if (null != serverInfo && serverInfo.isSetOwnerships()) {
+ Map<String, String> ownerships = serverInfo.getOwnerships();
+ logger.debug("Handshaked with {} : {} ownerships returned.", address, ownerships.size());
+ for (Map.Entry<String, String> entry : ownerships.entrySet()) {
+ Matcher matcher = streamNameRegexPattern.matcher(entry.getKey());
+ if (!matcher.matches()) {
+ continue;
+ }
+ updateOwnership(entry.getKey(), entry.getValue());
+ }
+ } else {
+ logger.debug("Handshaked with {} : no ownerships returned", address);
+ }
+ }
+
+ @Override
+ public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
+ cause = showRootCause(Optional.<StreamOp>absent(), cause);
+ handleRequestException(address, client, Optional.<StreamOp>absent(), cause);
+ }
+
+ @VisibleForTesting
+ public void handshake() {
+ clientManager.handshake();
+ logger.info("Handshaked with {} hosts, cached {} streams",
+ clientManager.getNumProxies(), ownershipCache.getNumCachedStreams());
+ }
+
+ @Override
+ public void onServerLeft(SocketAddress address) {
+ onServerLeft(address, null);
+ }
+
+ private void onServerLeft(SocketAddress address, ProxyClient sc) {
+ ownershipCache.removeAllStreamsFromOwner(address);
+ if (null == sc) {
+ clientManager.removeClient(address);
+ } else {
+ clientManager.removeClient(address, sc);
+ }
+ }
+
+ @Override
+ public void onServerJoin(SocketAddress address) {
+ // we only pre-create connection for client-side routing
+ // if it is server side routing, we only know the exact proxy address
+ // when #getOwner.
+ if (!clusterClient.isPresent()) {
+ clientManager.createClient(address);
+ }
+ }
+
+ public void close() {
+ closeLock.writeLock().lock();
+ try {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ } finally {
+ closeLock.writeLock().unlock();
+ }
+ clientManager.close();
+ routingService.unregisterListener(this);
+ routingService.stopService();
+ dlTimer.stop();
+ }
+
+ @Override
+ public Future<Void> check(String stream) {
+ final HeartbeatOp op = new HeartbeatOp(stream, false);
+ sendRequest(op);
+ return op.result();
+ }
+
+ @Override
+ public Future<Void> heartbeat(String stream) {
+ final HeartbeatOp op = new HeartbeatOp(stream, true);
+ sendRequest(op);
+ return op.result();
+ }
+
+ @Override
+ public Map<SocketAddress, Set<String>> getStreamOwnershipDistribution() {
+ return ownershipCache.getStreamOwnershipDistribution();
+ }
+
+ @Override
+ public Future<Void> setAcceptNewStream(boolean enabled) {
+ Map<SocketAddress, ProxyClient> snapshot = clientManager.getAllClients();
+ List<Future<Void>> futures = new ArrayList<Future<Void>>(snapshot.size());
+ for (Map.Entry<SocketAddress, ProxyClient> entry : snapshot.entrySet()) {
+ futures.add(entry.getValue().getService().setAcceptNewStream(enabled));
+ }
+ return Future.collect(futures).map(new Function<List<Void>, Void>() {
+ @Override
+ public Void apply(List<Void> list) {
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public Future<DLSN> write(String stream, ByteBuffer data) {
+ final WriteOp op = new WriteOp(stream, data);
+ sendRequest(op);
+ return op.result();
+ }
+
+ @Override
+ public Future<DLSN> writeRecordSet(String stream, final LogRecordSetBuffer recordSet) {
+ final WriteRecordSetOp op = new WriteRecordSetOp(stream, recordSet);
+ sendRequest(op);
+ return op.result();
+ }
+
+ @Override
+ public List<Future<DLSN>> writeBulk(String stream, List<ByteBuffer> data) {
+ if (data.size() > 0) {
+ final BulkWriteOp op = new BulkWriteOp(stream, data);
+ sendRequest(op);
+ return op.result();
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ @Override
+ public Future<Boolean> truncate(String stream, DLSN dlsn) {
+ final TruncateOp op = new TruncateOp(stream, dlsn);
+ sendRequest(op);
+ return op.result();
+ }
+
+ @Override
+ public Future<Void> delete(String stream) {
+ final DeleteOp op = new DeleteOp(stream);
+ sendRequest(op);
+ return op.result();
+ }
+
+ @Override
+ public Future<Void> release(String stream) {
+ final ReleaseOp op = new ReleaseOp(stream);
+ sendRequest(op);
+ return op.result();
+ }
+
+ @Override
+ public Future<Void> create(String stream) {
+ final CreateOp op = new CreateOp(stream);
+ sendRequest(op);
+ return op.result();
+ }
+
+ private void sendRequest(final StreamOp op) {
+ closeLock.readLock().lock();
+ try {
+ if (closed) {
+ op.fail(null, new DLClientClosedException("Client " + clientName + " is closed."));
+ } else {
+ doSend(op, null);
+ }
+ } finally {
+ closeLock.readLock().unlock();
+ }
+ }
+
+ /**
+ * Send the stream operation by routing service, excluding previous address if it is not null.
+ *
+ * @param op
+ * stream operation.
+ * @param previousAddr
+ * previous tried address.
+ */
+ private void doSend(final StreamOp op, final SocketAddress previousAddr) {
+ if (null != previousAddr) {
+ op.routingContext.addTriedHost(previousAddr, StatusCode.WRITE_EXCEPTION);
+ }
+ // Get host first
+ final SocketAddress address = ownershipCache.getOwner(op.stream);
+ if (null == address || op.routingContext.isTriedHost(address)) {
+ getOwner(op).addEventListener(new FutureEventListener<SocketAddress>() {
+ @Override
+ public void onFailure(Throwable cause) {
+ op.fail(null, cause);
+ }
+
+ @Override
+ public void onSuccess(SocketAddress ownerAddr) {
+ op.send(ownerAddr);
+ }
+ });
+ } else {
+ op.send(address);
+ }
+ }
+
+ private void retryGetOwnerFromResourcePlacementServer(final StreamOp op,
+ final Promise<SocketAddress> getOwnerPromise,
+ final Throwable cause) {
+ if (op.shouldTimeout()) {
+ op.fail(null, cause);
+ return;
+ }
+ getOwnerFromResourcePlacementServer(op, getOwnerPromise);
+ }
+
+ private void getOwnerFromResourcePlacementServer(final StreamOp op,
+ final Promise<SocketAddress> getOwnerPromise) {
+ clusterClient.get().getService().getOwner(op.stream, op.ctx)
+ .addEventListener(new FutureEventListener<WriteResponse>() {
+ @Override
+ public void onFailure(Throwable cause) {
+ getOwnerPromise.updateIfEmpty(new Throw<SocketAddress>(cause));
+ }
+
+ @Override
+ public void onSuccess(WriteResponse value) {
+ if (StatusCode.FOUND == value.getHeader().getCode()
+ && null != value.getHeader().getLocation()) {
+ try {
+ InetSocketAddress addr = DLSocketAddress.deserialize(
+ value.getHeader().getLocation()
+ ).getSocketAddress();
+ getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr));
+ } catch (IOException e) {
+ // retry from the routing server again
+ logger.error("ERROR in getOwner", e);
+ retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise, e);
+ return;
+ }
+ } else {
+ // retry from the routing server again
+ retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise,
+ new StreamUnavailableException("Stream " + op.stream + "'s owner is unknown"));
+ }
+ }
+ });
+ }
+
+ private Future<SocketAddress> getOwner(final StreamOp op) {
+ if (clusterClient.isPresent()) {
+ final Promise<SocketAddress> getOwnerPromise = new Promise<SocketAddress>();
+ getOwnerFromResourcePlacementServer(op, getOwnerPromise);
+ return getOwnerPromise;
+ }
+ // pickup host by hashing
+ try {
+ return Future.value(routingService.getHost(op.stream, op.routingContext));
+ } catch (NoBrokersAvailableException nbae) {
+ return Future.exception(nbae);
+ }
+ }
+
+ private void sendWriteRequest(final SocketAddress addr, final StreamOp op) {
+ // Get corresponding finagle client
+ final ProxyClient sc = clientManager.getClient(addr);
+ final long startTimeNanos = System.nanoTime();
+ // write the request to that host.
+ op.sendRequest(sc).addEventListener(new FutureEventListener<ResponseHeader>() {
+ @Override
+ public void onSuccess(ResponseHeader header) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Received response; header: {}", header);
+ }
+ clientStats.completeProxyRequest(addr, header.getCode(), startTimeNanos);
+ // update routing context
+ op.routingContext.addTriedHost(addr, header.getCode());
+ switch (header.getCode()) {
+ case SUCCESS:
+ // success handling is done per stream op
+ break;
+ case FOUND:
+ handleRedirectResponse(header, op, addr);
+ break;
+ // for overcapacity, dont report failure since this normally happens quite a bit
+ case OVER_CAPACITY:
+ logger.debug("Failed to write request to {} : {}", op.stream, header);
+ op.fail(addr, ProtocolUtils.exception(header));
+ break;
+ // for responses that indicate the requests definitely failed,
+ // we should fail them immediately (e.g. TOO_LARGE_RECORD, METADATA_EXCEPTION)
+ case NOT_IMPLEMENTED:
+ case METADATA_EXCEPTION:
+ case LOG_EMPTY:
+ case LOG_NOT_FOUND:
+ case TRUNCATED_TRANSACTION:
+ case END_OF_STREAM:
+ case TRANSACTION_OUT_OF_ORDER:
+ case INVALID_STREAM_NAME:
+ case REQUEST_DENIED:
+ case TOO_LARGE_RECORD:
+ case CHECKSUM_FAILED:
+ // status code NOT_READY is returned if failfast is enabled in the server. don't redirect
+ // since the proxy may still own the stream.
+ case STREAM_NOT_READY:
+ op.fail(addr, ProtocolUtils.exception(header));
+ break;
+ case SERVICE_UNAVAILABLE:
+ handleServiceUnavailable(addr, sc, Optional.of(op));
+ break;
+ case REGION_UNAVAILABLE:
+ // region is unavailable, redirect the request to hosts in other region
+ redirect(op, null);
+ break;
+ // Proxy was overloaded and refused to try to acquire the stream. Don't remove ownership, since
+ // we didn't have it in the first place.
+ case TOO_MANY_STREAMS:
+ handleRedirectableError(addr, op, header);
+ break;
+ case STREAM_UNAVAILABLE:
+ case ZOOKEEPER_ERROR:
+ case LOCKING_EXCEPTION:
+ case UNEXPECTED:
+ case INTERRUPTED:
+ case BK_TRANSMIT_ERROR:
+ case FLUSH_TIMEOUT:
+ default:
+ // when we are receiving these exceptions from proxy, it means proxy or the stream is closed
+ // redirect the request.
+ ownershipCache.removeOwnerFromStream(op.stream, addr, header.getCode().name());
+ handleRedirectableError(addr, op, header);
+ break;
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ Optional<StreamOp> opOptional = Optional.of(op);
+ cause = showRootCause(opOptional, cause);
+ clientStats.failProxyRequest(addr, cause, startTimeNanos);
+ handleRequestException(addr, sc, opOptional, cause);
+ }
+ });
+ }
+
+ // Response Handlers
+
+ Throwable showRootCause(Optional<StreamOp> op, Throwable cause) {
+ if (cause instanceof Failure) {
+ Failure failure = (Failure) cause;
+ if (failure.isFlagged(Failure.Wrapped())) {
+ try {
+ // if it is a wrapped failure, unwrap it first
+ cause = failure.show();
+ } catch (IllegalArgumentException iae) {
+ if (op.isPresent()) {
+ logger.warn("Failed to unwrap finagle failure of stream {} : ", op.get().stream, iae);
+ } else {
+ logger.warn("Failed to unwrap finagle failure : ", iae);
+ }
+ }
+ }
+ }
+ return cause;
+ }
+
+ private void handleRedirectableError(SocketAddress addr,
+ StreamOp op,
+ ResponseHeader header) {
+ if (streamFailfast) {
+ op.fail(addr, ProtocolUtils.exception(header));
+ } else {
+ redirect(op, null);
+ }
+ }
+
+ void handleServiceUnavailable(SocketAddress addr,
+ ProxyClient sc,
+ Optional<StreamOp> op) {
+ // service is unavailable, remove it out of routing service
+ routingService.removeHost(addr, new ServiceUnavailableException(addr + " is unavailable now."));
+ onServerLeft(addr);
+ if (op.isPresent()) {
+ ownershipCache.removeOwnerFromStream(op.get().stream, addr, addr + " is unavailable now.");
+ // redirect the request to other host.
+ redirect(op.get(), null);
+ }
+ }
+
+ void handleRequestException(SocketAddress addr,
+ ProxyClient sc,
+ Optional<StreamOp> op,
+ Throwable cause) {
+ boolean resendOp = false;
+ boolean removeOwnerFromStream = false;
+ SocketAddress previousAddr = addr;
+ String reason = cause.getMessage();
+ if (cause instanceof ConnectionFailedException || cause instanceof java.net.ConnectException) {
+ routingService.removeHost(addr, cause);
+ onServerLeft(addr, sc);
+ removeOwnerFromStream = true;
+ // redirect the request to other host.
+ resendOp = true;
+ } else if (cause instanceof ChannelException) {
+ // java.net.ConnectException typically means connection is refused remotely
+ // no process listening on remote address/port.
+ if (cause.getCause() instanceof java.net.ConnectException) {
+ routingService.removeHost(addr, cause.getCause());
+ onServerLeft(addr);
+ reason = cause.getCause().getMessage();
+ } else {
+ routingService.removeHost(addr, cause);
+ reason = cause.getMessage();
+ }
+ removeOwnerFromStream = true;
+ // redirect the request to other host.
+ resendOp = true;
+ } else if (cause instanceof ServiceTimeoutException) {
+ // redirect the request to itself again, which will backoff for a while
+ resendOp = true;
+ previousAddr = null;
+ } else if (cause instanceof WriteException) {
+ // redirect the request to other host.
+ resendOp = true;
+ } else if (cause instanceof ServiceException) {
+ // redirect the request to other host.
+ clientManager.removeClient(addr, sc);
+ resendOp = true;
+ } else if (cause instanceof TApplicationException) {
+ handleTApplicationException(cause, op, addr, sc);
+ } else if (cause instanceof Failure) {
+ handleFinagleFailure((Failure) cause, op, addr);
+ } else {
+ // Default handler
+ handleException(cause, op, addr);
+ }
+
+ if (op.isPresent()) {
+ if (removeOwnerFromStream) {
+ ownershipCache.removeOwnerFromStream(op.get().stream, addr, reason);
+ }
+ if (resendOp) {
+ doSend(op.get(), previousAddr);
+ }
+ }
+ }
+
+ /**
+ * Redirect the request to new proxy <i>newAddr</i>. If <i>newAddr</i> is null,
+ * it would pick up a host from routing service.
+ *
+ * @param op
+ * stream operation
+ * @param newAddr
+ * new proxy address
+ */
+ void redirect(StreamOp op, SocketAddress newAddr) {
+ ownershipCache.getOwnershipStatsLogger().onRedirect(op.stream);
+ if (null != newAddr) {
+ logger.debug("Redirect request {} to new owner {}.", op, newAddr);
+ op.send(newAddr);
+ } else {
+ doSend(op, null);
+ }
+ }
+
+ void handleFinagleFailure(Failure failure,
+ Optional<StreamOp> op,
+ SocketAddress addr) {
+ if (failure.isFlagged(Failure.Restartable())) {
+ if (op.isPresent()) {
+ // redirect the request to other host
+ doSend(op.get(), addr);
+ }
+ } else {
+ // fail the request if it is other types of failures
+ handleException(failure, op, addr);
+ }
+ }
+
+ void handleException(Throwable cause,
+ Optional<StreamOp> op,
+ SocketAddress addr) {
+ // RequestTimeoutException: fail it and let client decide whether to retry or not.
+
+ // FailedFastException:
+ // We don't actually know when FailedFastException will be thrown
+ // so properly we just throw it back to application to let application
+ // handle it.
+
+ // Other Exceptions: as we don't know how to handle them properly so throw them to client
+ if (op.isPresent()) {
+ logger.error("Failed to write request to {} @ {} : {}",
+ new Object[]{op.get().stream, addr, cause.toString()});
+ op.get().fail(addr, cause);
+ }
+ }
+
+ void handleTApplicationException(Throwable cause,
+ Optional<StreamOp> op,
+ SocketAddress addr,
+ ProxyClient sc) {
+ TApplicationException ex = (TApplicationException) cause;
+ if (ex.getType() == TApplicationException.UNKNOWN_METHOD) {
+ // if we encountered unknown method exception on thrift server, it means this proxy
+ // has problem. we should remove it from routing service, clean up ownerships
+ routingService.removeHost(addr, cause);
+ onServerLeft(addr, sc);
+ if (op.isPresent()) {
+ ownershipCache.removeOwnerFromStream(op.get().stream, addr, cause.getMessage());
+ doSend(op.get(), addr);
+ }
+ } else {
+ handleException(cause, op, addr);
+ }
+ }
+
+ void handleRedirectResponse(ResponseHeader header, StreamOp op, SocketAddress curAddr) {
+ SocketAddress ownerAddr = null;
+ if (header.isSetLocation()) {
+ String owner = header.getLocation();
+ try {
+ ownerAddr = DLSocketAddress.deserialize(owner).getSocketAddress();
+ // if we are receiving a direct request to same host, we won't try the same host.
+ // as the proxy will shut itself down if it redirects client to itself.
+ if (curAddr.equals(ownerAddr)) {
+ logger.warn("Request to stream {} is redirected to same server {}!", op.stream, curAddr);
+ ownerAddr = null;
+ } else {
+ // update ownership when redirects.
+ ownershipCache.updateOwner(op.stream, ownerAddr);
+ }
+ } catch (IOException e) {
+ ownerAddr = null;
+ }
+ }
+ redirect(op, ownerAddr);
+ }
+
+ void updateOwnership(String stream, String location) {
+ try {
+ SocketAddress ownerAddr = DLSocketAddress.deserialize(location).getSocketAddress();
+ // update ownership
+ ownershipCache.updateOwner(stream, ownerAddr);
+ } catch (IOException e) {
+ logger.warn("Invalid ownership {} found for stream {} : ",
+ new Object[] { location, stream, e });
+ }
+ }
+
+}