You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zipkin.apache.org by ad...@apache.org on 2019/05/09 14:54:52 UTC
[incubator-zipkin] branch master updated: Ports the scribe
collector to armeria (#2563)
This is an automated email from the ASF dual-hosted git repository.
adriancole pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git
The following commit(s) were added to refs/heads/master by this push:
new f419eae Ports the scribe collector to armeria (#2563)
f419eae is described below
commit f419eae55434db22825649515aa9499b05523014
Author: Anuraag Agrawal <an...@gmail.com>
AuthorDate: Thu May 9 23:54:43 2019 +0900
Ports the scribe collector to armeria (#2563)
---
pom.xml | 4 +
zipkin-collector/scribe/pom.xml | 69 +-
.../collector/scribe/NettyScribeServer.java | 80 ++
.../main/java/zipkin2/collector/scribe/Scribe.java | 58 --
.../zipkin2/collector/scribe/ScribeCollector.java | 21 +-
.../collector/scribe/ScribeInboundHandler.java | 223 +++++
.../collector/scribe/ScribeSpanConsumer.java | 50 +-
.../collector/scribe/generated/LogEntry.java | 482 +++++++++
.../collector/scribe/generated/ResultCode.java | 43 +
.../zipkin2/collector/scribe/generated/Scribe.java | 1045 ++++++++++++++++++++
.../collector/scribe/ITScribeCollector.java | 108 ++
.../collector/scribe/ScribeCollectorTest.java | 5 +-
.../collector/scribe/ScribeSpanConsumerTest.java | 110 ++-
13 files changed, 2102 insertions(+), 196 deletions(-)
diff --git a/pom.xml b/pom.xml
index e9903c2..856ac6a 100755
--- a/pom.xml
+++ b/pom.xml
@@ -672,6 +672,7 @@
<exclude>kafka_*/**</exclude>
<exclude>**/nohup.out</exclude>
<exclude>src/test/resources/**</exclude>
+ <exclude>**/generated/**</exclude>
</excludes>
<strictCheck>true</strictCheck>
</configuration>
@@ -732,6 +733,9 @@
<!-- GitHub files -->
<exclude>**/.github/**</exclude>
+ <!-- Generated files -->
+ <exclude>**/generated/**</exclude>
+
<!-- document files -->
<exclude>**/*.md</exclude>
diff --git a/zipkin-collector/scribe/pom.xml b/zipkin-collector/scribe/pom.xml
index a644c67..178323f 100644
--- a/zipkin-collector/scribe/pom.xml
+++ b/zipkin-collector/scribe/pom.xml
@@ -31,6 +31,9 @@
<properties>
<main.basedir>${project.basedir}/../..</main.basedir>
+
+ <!-- error-prone doesn't like generated code -->
+ <errorprone.args>-XepDisableWarningsInGeneratedCode</errorprone.args>
</properties>
<dependencies>
@@ -41,55 +44,23 @@
</dependency>
<dependency>
- <groupId>com.facebook.swift</groupId>
- <artifactId>swift-service</artifactId>
- <version>0.23.1</version>
- <exclusions>
- <!-- airlift:configuration includes bval-jsr303, which can easily conflict -->
- <exclusion>
- <groupId>io.airlift</groupId>
- <artifactId>configuration</artifactId>
- </exclusion>
- <exclusion>
- <!-- interferes with netty-tcnative-boringssl-static versions -->
- <groupId>com.facebook.nifty</groupId>
- <artifactId>nifty-ssl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.facebook.nifty</groupId>
- <artifactId>nifty-client</artifactId>
- </exclusion>
- <exclusion>
- <!-- huge and unused -->
- <groupId>org.weakref</groupId>
- <artifactId>jmxutils</artifactId>
- </exclusion>
- <!-- trying to reduce size -->
- <exclusion>
- <groupId>io.airlift</groupId>
- <artifactId>stats</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.inject</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.inject.extensions</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.inject</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>com.linecorp.armeria</groupId>
+ <artifactId>armeria-thrift</artifactId>
+ <version>${armeria.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.linecorp.armeria</groupId>
+ <artifactId>armeria-testing</artifactId>
+ <version>${armeria.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
</dependency>
</dependencies>
</project>
diff --git a/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/NettyScribeServer.java b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/NettyScribeServer.java
new file mode 100644
index 0000000..009150e
--- /dev/null
+++ b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/NettyScribeServer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.collector.scribe;
+
+import com.linecorp.armeria.common.CommonPools;
+import com.linecorp.armeria.common.util.EventLoopGroups;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import java.net.InetSocketAddress;
+
+class NettyScribeServer {
+
+ private final int port;
+ private final ScribeSpanConsumer scribe;
+
+ volatile EventLoopGroup bossGroup;
+ volatile Channel channel;
+
+ NettyScribeServer(int port, ScribeSpanConsumer scribe) {
+ this.port = port;
+ this.scribe = scribe;
+ }
+
+ void start() {
+ bossGroup = EventLoopGroups.newEventLoopGroup(1);
+ EventLoopGroup workerGroup = CommonPools.workerGroup();
+
+ ServerBootstrap b = new ServerBootstrap();
+ try {
+ channel = b.group(bossGroup, workerGroup)
+ .channel(EventLoopGroups.serverChannelType(bossGroup))
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ ch.pipeline().addLast(new ScribeInboundHandler(scribe));
+ }
+ })
+ // Uses same value as the previously used swift library for consistency.
+ .childOption(ChannelOption.SO_BACKLOG, 1024)
+ .bind(port)
+ .syncUninterruptibly()
+ .channel();
+ } catch (Throwable t) {
+ throw new RuntimeException("Could not start scribe server.", t);
+ }
+ }
+
+ void close() {
+ if (channel == null) return;
+ channel.close();
+ bossGroup.shutdownGracefully();
+ }
+
+ boolean isRunning() {
+ return channel != null && channel.isActive();
+ }
+
+ int port() {
+ if (channel == null) return 0;
+ return ((InetSocketAddress) channel.localAddress()).getPort();
+ }
+}
diff --git a/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/Scribe.java b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/Scribe.java
deleted file mode 100644
index dae4eb2..0000000
--- a/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/Scribe.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package zipkin2.collector.scribe;
-
-import com.facebook.swift.codec.ThriftEnumValue;
-import com.facebook.swift.codec.ThriftField;
-import com.facebook.swift.codec.ThriftStruct;
-import com.facebook.swift.service.ThriftMethod;
-import com.facebook.swift.service.ThriftService;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.util.List;
-
-@ThriftService("Scribe")
-public interface Scribe {
-
- @ThriftMethod(value = "Log")
- ListenableFuture<ResultCode> log(@ThriftField(value = 1) List<LogEntry> messages);
-
- enum ResultCode {
- OK(0),
- TRY_LATER(1);
-
- final int value;
-
- ResultCode(int value) {
- this.value = value;
- }
-
- @ThriftEnumValue
- public int value() {
- return value;
- }
- }
-
- @ThriftStruct(value = "LogEntry")
- final class LogEntry {
-
- @ThriftField(value = 1)
- public String category;
-
- @ThriftField(value = 2)
- public String message;
- }
-}
diff --git a/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeCollector.java b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeCollector.java
index a7fdd57..7d72bf3 100644
--- a/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeCollector.java
+++ b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeCollector.java
@@ -16,10 +16,6 @@
*/
package zipkin2.collector.scribe;
-import com.facebook.swift.codec.ThriftCodecManager;
-import com.facebook.swift.service.ThriftServer;
-import com.facebook.swift.service.ThriftServerConfig;
-import com.facebook.swift.service.ThriftServiceProcessor;
import zipkin2.CheckResult;
import zipkin2.collector.Collector;
import zipkin2.collector.CollectorComponent;
@@ -28,9 +24,6 @@ import zipkin2.collector.CollectorSampler;
import zipkin2.storage.SpanConsumer;
import zipkin2.storage.StorageComponent;
-import static com.google.common.base.Preconditions.checkState;
-import static java.util.Collections.emptyList;
-
/**
* This collector accepts Scribe logs in a specified category. Each log entry is expected to contain
* a single span, which is TBinaryProtocol big-endian, then base64 encoded. These spans are chained
@@ -88,13 +81,11 @@ public final class ScribeCollector extends CollectorComponent {
}
}
- final ThriftServer server;
+ final NettyScribeServer server;
ScribeCollector(Builder builder) {
- ScribeSpanConsumer scribe = new ScribeSpanConsumer(builder);
- ThriftServiceProcessor processor =
- new ThriftServiceProcessor(new ThriftCodecManager(), emptyList(), scribe);
- server = new ThriftServer(processor, new ThriftServerConfig().setPort(builder.port));
+ server = new NettyScribeServer(builder.port, new ScribeSpanConsumer(
+ builder.delegate.build(), builder.metrics, builder.category));
}
/** Will throw an exception if the {@link Builder#port(int) port} is already in use. */
@@ -106,10 +97,8 @@ public final class ScribeCollector extends CollectorComponent {
@Override
public CheckResult check() {
- try {
- checkState(server.isRunning(), "server not running");
- } catch (RuntimeException e) {
- return CheckResult.failed(e);
+ if (!server.isRunning()) {
+ return CheckResult.failed(new IllegalStateException("server not running"));
}
return CheckResult.OK;
}
diff --git a/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeInboundHandler.java b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeInboundHandler.java
new file mode 100644
index 0000000..465809b
--- /dev/null
+++ b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeInboundHandler.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package zipkin2.collector.scribe;
+
+import com.linecorp.armeria.common.HttpData;
+import com.linecorp.armeria.common.HttpHeaderNames;
+import com.linecorp.armeria.common.HttpHeaders;
+import com.linecorp.armeria.common.HttpMethod;
+import com.linecorp.armeria.common.HttpRequest;
+import com.linecorp.armeria.common.HttpResponse;
+import com.linecorp.armeria.common.util.Exceptions;
+import com.linecorp.armeria.common.util.SafeCloseable;
+import com.linecorp.armeria.server.ServiceRequestContext;
+import com.linecorp.armeria.server.ServiceRequestContextBuilder;
+import com.linecorp.armeria.server.thrift.THttpService;
+import com.linecorp.armeria.unsafe.ByteBufHttpData;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufHolder;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.EventLoop;
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ScribeInboundHandler extends ChannelInboundHandlerAdapter {
+
+ static final Logger logger = LoggerFactory.getLogger(ScribeInboundHandler.class);
+
+ // Headers mostly copied from https://github.com/apache/thrift/blob/master/lib/javame/src/org/apache/thrift/transport/THttpClient.java#L130
+ static final HttpHeaders THRIFT_HEADERS = HttpHeaders.of(
+ HttpMethod.POST, "/internal/zipkin-thriftrpc")
+ .set(HttpHeaderNames.CONTENT_TYPE, "application/x-thrift")
+ .set(HttpHeaderNames.ACCEPT, "application/x-thrift")
+ .set(HttpHeaderNames.USER_AGENT, "Zipkin/ScribeInboundHandler")
+ .asImmutable();
+
+ final THttpService scribeService;
+
+ ScribeInboundHandler(ScribeSpanConsumer scribe) {
+ scribeService = THttpService.of(scribe);
+ }
+
+ enum ReadState {
+ HEADER,
+ PAYLOAD
+ }
+
+ CompositeByteBuf pending;
+ ReadState state;
+ int nextFrameSize;
+
+ Map<Integer, ByteBuf> pendingResponses = new HashMap<>();
+ int nextResponseIndex = 0;
+ int previouslySentResponseIndex = -1;
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ pending = ctx.alloc().compositeBuffer();
+ state = ReadState.HEADER;
+ }
+
+ @Override
+ public void channelRead(final ChannelHandlerContext ctx, Object msg) {
+ if (pending == null) {
+ // Already closed (probably due to an exception).
+ return;
+ }
+
+ assert msg instanceof ByteBuf;
+ ByteBuf buf = (ByteBuf) msg;
+ pending.addComponent(true, buf);
+
+ switch (state) {
+ case HEADER:
+ maybeReadHeader(ctx);
+ break;
+ case PAYLOAD:
+ maybeReadPayload(ctx);
+ break;
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) {
+ release();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ Exceptions.logIfUnexpected(logger, ctx.channel(), cause);
+
+ release();
+ closeOnFlush(ctx.channel());
+ }
+
+ void maybeReadHeader(ChannelHandlerContext ctx) {
+ if (pending.readableBytes() < 4) {
+ return;
+ }
+ nextFrameSize = pending.readInt();
+ state = ReadState.PAYLOAD;
+ maybeReadPayload(ctx);
+ }
+
+ void maybeReadPayload(ChannelHandlerContext ctx) {
+ if (pending.readableBytes() < nextFrameSize) {
+ return;
+ }
+
+ ByteBuf payload = ctx.alloc().buffer(nextFrameSize);
+ pending.readBytes(payload, nextFrameSize);
+ pending.discardSomeReadBytes();
+
+ state = ReadState.HEADER;
+
+ HttpRequest request = HttpRequest.of(
+ THRIFT_HEADERS.toMutable(),
+ new ByteBufHttpData(payload, true));
+ ServiceRequestContextBuilder requestContextBuilder = ServiceRequestContextBuilder.of(request)
+ .service(scribeService)
+ .alloc(ctx.alloc());
+
+ if (ctx.executor() instanceof EventLoop) {
+ requestContextBuilder.eventLoop((EventLoop) ctx.executor());
+ }
+
+ ServiceRequestContext requestContext = requestContextBuilder.build();
+
+ final HttpResponse response;
+ try (SafeCloseable unused = requestContext.push()){
+ response = scribeService.serve(requestContext, request);
+ } catch (Exception e) {
+ exceptionCaught(ctx, e);
+ return;
+ }
+
+ int responseIndex = nextResponseIndex++;
+
+ response.aggregateWithPooledObjects(ctx.executor(), ctx.alloc()).handle((msg, t) -> {
+ if (t != null) {
+ exceptionCaught(ctx, t);
+ return null;
+ }
+
+ HttpData content = msg.content();
+ ByteBuf returned = ctx.alloc().buffer(content.length() + 4);
+ returned.writeInt(content.length());
+
+ if (content instanceof ByteBufHolder) {
+ ByteBuf buf = ((ByteBufHolder) content).content();
+ try {
+ returned.writeBytes(((ByteBufHolder) content).content());
+ } finally {
+ buf.release();
+ }
+ } else {
+ returned.writeBytes(content.array(), content.offset(), content.length());
+ }
+
+ if (responseIndex == previouslySentResponseIndex + 1) {
+ ctx.writeAndFlush(returned);
+ previouslySentResponseIndex++;
+
+ flushResponses(ctx);
+ } else {
+ pendingResponses.put(responseIndex, returned);
+ }
+
+ return null;
+ });
+ }
+
+ void flushResponses(ChannelHandlerContext ctx) {
+ while (!pendingResponses.isEmpty()) {
+ ByteBuf response = pendingResponses.remove(previouslySentResponseIndex + 1);
+ if (response == null) {
+ return;
+ }
+
+ ctx.writeAndFlush(response);
+ previouslySentResponseIndex++;
+ }
+ }
+
+ void release() {
+ if (pending != null) {
+ pending.release();
+ pending = null;
+ }
+
+ pendingResponses.values().forEach(ByteBuf::release);
+ pendingResponses.clear();
+ }
+
+ /**
+ * Closes the specified channel after all queued write requests are flushed.
+ */
+ static void closeOnFlush(Channel ch) {
+ if (ch.isActive()) {
+ ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+}
diff --git a/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeSpanConsumer.java b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeSpanConsumer.java
index 644514a..35ea7dc 100644
--- a/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeSpanConsumer.java
+++ b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/ScribeSpanConsumer.java
@@ -16,39 +16,39 @@
*/
package zipkin2.collector.scribe;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
-import java.util.stream.Collectors;
+import org.apache.thrift.async.AsyncMethodCallback;
import zipkin2.Callback;
import zipkin2.Span;
import zipkin2.codec.SpanBytesDecoder;
import zipkin2.collector.Collector;
import zipkin2.collector.CollectorMetrics;
+import zipkin2.collector.scribe.generated.LogEntry;
+import zipkin2.collector.scribe.generated.ResultCode;
+import zipkin2.collector.scribe.generated.Scribe;
import zipkin2.internal.Nullable;
-final class ScribeSpanConsumer implements Scribe {
+class ScribeSpanConsumer implements Scribe.AsyncIface {
final Collector collector;
final CollectorMetrics metrics;
final String category;
- ScribeSpanConsumer(ScribeCollector.Builder builder) {
- this.collector = builder.delegate.build();
- this.metrics = builder.metrics;
- this.category = builder.category;
+ ScribeSpanConsumer(Collector collector, CollectorMetrics metrics, String category) {
+ this.collector = collector;
+ this.metrics = metrics;
+ this.category = category;
}
@Override
- public ListenableFuture<ResultCode> log(List<LogEntry> logEntries) {
+ public void Log(List<LogEntry> messages, AsyncMethodCallback<ResultCode> resultHandler) {
metrics.incrementMessages();
List<Span> spans = new ArrayList<>();
int byteCount = 0;
try {
- for (LogEntry logEntry : logEntries) {
+ for (LogEntry logEntry : messages) {
if (!category.equals(logEntry.category)) continue;
byte[] bytes = logEntry.message.getBytes(StandardCharsets.ISO_8859_1);
bytes = Base64.getMimeDecoder().decode(bytes); // finagle-zipkin uses mime encoding
@@ -57,25 +57,25 @@ final class ScribeSpanConsumer implements Scribe {
}
} catch (RuntimeException e) {
metrics.incrementMessagesDropped();
- return Futures.immediateFailedFuture(e);
+ resultHandler.onError(e);
+ return;
} finally {
metrics.incrementBytes(byteCount);
}
- SettableFuture<ResultCode> result = SettableFuture.create();
collector.accept(
- spans,
- new Callback<Void>() {
- @Override
- public void onSuccess(@Nullable Void value) {
- result.set(ResultCode.OK);
- }
+ spans,
+ new Callback<Void>() {
+ @Override
+ public void onSuccess(@Nullable Void value) {
+ resultHandler.onComplete(ResultCode.OK);
+ }
- @Override
- public void onError(Throwable t) {
- result.setException(t);
- }
- });
- return result;
+ @Override
+ public void onError(Throwable t) {
+ Exception error = t instanceof Exception ? (Exception) t : new RuntimeException(t);
+ resultHandler.onError(error);
+ }
+ });
}
}
diff --git a/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/generated/LogEntry.java b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/generated/LogEntry.java
new file mode 100644
index 0000000..395867f
--- /dev/null
+++ b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/generated/LogEntry.java
@@ -0,0 +1,482 @@
+/**
+ * Autogenerated by Thrift Compiler (0.12.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package zipkin2.collector.scribe.generated;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.12.0)", date = "2019-05-07")
+public class LogEntry implements org.apache.thrift.TBase<LogEntry, LogEntry._Fields>, java.io.Serializable, Cloneable, Comparable<LogEntry> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("LogEntry");
+
+ private static final org.apache.thrift.protocol.TField CATEGORY_FIELD_DESC = new org.apache.thrift.protocol.TField("category", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("message", org.apache.thrift.protocol.TType.STRING, (short)2);
+
+ private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new LogEntryStandardSchemeFactory();
+ private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new LogEntryTupleSchemeFactory();
+
+ public @org.apache.thrift.annotation.Nullable java.lang.String category; // required
+ public @org.apache.thrift.annotation.Nullable java.lang.String message; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ CATEGORY((short)1, "category"),
+ MESSAGE((short)2, "message");
+
+ private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+ static {
+ for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ @org.apache.thrift.annotation.Nullable
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // CATEGORY
+ return CATEGORY;
+ case 2: // MESSAGE
+ return MESSAGE;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ @org.apache.thrift.annotation.Nullable
+ public static _Fields findByName(java.lang.String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final java.lang.String _fieldName;
+
+ _Fields(short thriftId, java.lang.String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public java.lang.String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.CATEGORY, new org.apache.thrift.meta_data.FieldMetaData("category", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.MESSAGE, new org.apache.thrift.meta_data.FieldMetaData("message", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LogEntry.class, metaDataMap);
+ }
+
+ public LogEntry() {
+ }
+
+ public LogEntry(
+ java.lang.String category,
+ java.lang.String message)
+ {
+ this();
+ this.category = category;
+ this.message = message;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public LogEntry(LogEntry other) {
+ if (other.isSetCategory()) {
+ this.category = other.category;
+ }
+ if (other.isSetMessage()) {
+ this.message = other.message;
+ }
+ }
+
+ public LogEntry deepCopy() {
+ return new LogEntry(this);
+ }
+
+ @Override
+ public void clear() {
+ this.category = null;
+ this.message = null;
+ }
+
+ @org.apache.thrift.annotation.Nullable
+ public java.lang.String getCategory() {
+ return this.category;
+ }
+
+ public LogEntry setCategory(@org.apache.thrift.annotation.Nullable java.lang.String category) {
+ this.category = category;
+ return this;
+ }
+
+ public void unsetCategory() {
+ this.category = null;
+ }
+
+ /** Returns true if field category is set (has been assigned a value) and false otherwise */
+ public boolean isSetCategory() {
+ return this.category != null;
+ }
+
+ public void setCategoryIsSet(boolean value) {
+ if (!value) {
+ this.category = null;
+ }
+ }
+
+ @org.apache.thrift.annotation.Nullable
+ public java.lang.String getMessage() {
+ return this.message;
+ }
+
+ public LogEntry setMessage(@org.apache.thrift.annotation.Nullable java.lang.String message) {
+ this.message = message;
+ return this;
+ }
+
+ public void unsetMessage() {
+ this.message = null;
+ }
+
+ /** Returns true if field message is set (has been assigned a value) and false otherwise */
+ public boolean isSetMessage() {
+ return this.message != null;
+ }
+
+ public void setMessageIsSet(boolean value) {
+ if (!value) {
+ this.message = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
+ switch (field) {
+ case CATEGORY:
+ if (value == null) {
+ unsetCategory();
+ } else {
+ setCategory((java.lang.String)value);
+ }
+ break;
+
+ case MESSAGE:
+ if (value == null) {
+ unsetMessage();
+ } else {
+ setMessage((java.lang.String)value);
+ }
+ break;
+
+ }
+ }
+
+ @org.apache.thrift.annotation.Nullable
+ public java.lang.Object getFieldValue(_Fields field) {
+ switch (field) {
+ case CATEGORY:
+ return getCategory();
+
+ case MESSAGE:
+ return getMessage();
+
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new java.lang.IllegalArgumentException();
+ }
+
+ switch (field) {
+ case CATEGORY:
+ return isSetCategory();
+ case MESSAGE:
+ return isSetMessage();
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(java.lang.Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof LogEntry)
+ return this.equals((LogEntry)that);
+ return false;
+ }
+
+ public boolean equals(LogEntry that) {
+ if (that == null)
+ return false;
+ if (this == that)
+ return true;
+
+ boolean this_present_category = true && this.isSetCategory();
+ boolean that_present_category = true && that.isSetCategory();
+ if (this_present_category || that_present_category) {
+ if (!(this_present_category && that_present_category))
+ return false;
+ if (!this.category.equals(that.category))
+ return false;
+ }
+
+ boolean this_present_message = true && this.isSetMessage();
+ boolean that_present_message = true && that.isSetMessage();
+ if (this_present_message || that_present_message) {
+ if (!(this_present_message && that_present_message))
+ return false;
+ if (!this.message.equals(that.message))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int hashCode = 1;
+
+ hashCode = hashCode * 8191 + ((isSetCategory()) ? 131071 : 524287);
+ if (isSetCategory())
+ hashCode = hashCode * 8191 + category.hashCode();
+
+ hashCode = hashCode * 8191 + ((isSetMessage()) ? 131071 : 524287);
+ if (isSetMessage())
+ hashCode = hashCode * 8191 + message.hashCode();
+
+ return hashCode;
+ }
+
+ @Override
+ public int compareTo(LogEntry other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = java.lang.Boolean.valueOf(isSetCategory()).compareTo(other.isSetCategory());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetCategory()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.category, other.category);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = java.lang.Boolean.valueOf(isSetMessage()).compareTo(other.isSetMessage());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetMessage()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.message, other.message);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ @org.apache.thrift.annotation.Nullable
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ scheme(iprot).read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ scheme(oprot).write(oprot, this);
+ }
+
+ @Override
+ public java.lang.String toString() {
+ java.lang.StringBuilder sb = new java.lang.StringBuilder("LogEntry(");
+ boolean first = true;
+
+ sb.append("category:");
+ if (this.category == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.category);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("message:");
+ if (this.message == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.message);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class LogEntryStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+ public LogEntryStandardScheme getScheme() {
+ return new LogEntryStandardScheme();
+ }
+ }
+
+ private static class LogEntryStandardScheme extends org.apache.thrift.scheme.StandardScheme<LogEntry> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, LogEntry struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // CATEGORY
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.category = iprot.readString();
+ struct.setCategoryIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ case 2: // MESSAGE
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.message = iprot.readString();
+ struct.setMessageIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, LogEntry struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.category != null) {
+ oprot.writeFieldBegin(CATEGORY_FIELD_DESC);
+ oprot.writeString(struct.category);
+ oprot.writeFieldEnd();
+ }
+ if (struct.message != null) {
+ oprot.writeFieldBegin(MESSAGE_FIELD_DESC);
+ oprot.writeString(struct.message);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class LogEntryTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+ public LogEntryTupleScheme getScheme() {
+ return new LogEntryTupleScheme();
+ }
+ }
+
+ private static class LogEntryTupleScheme extends org.apache.thrift.scheme.TupleScheme<LogEntry> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, LogEntry struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet optionals = new java.util.BitSet();
+ if (struct.isSetCategory()) {
+ optionals.set(0);
+ }
+ if (struct.isSetMessage()) {
+ optionals.set(1);
+ }
+ oprot.writeBitSet(optionals, 2);
+ if (struct.isSetCategory()) {
+ oprot.writeString(struct.category);
+ }
+ if (struct.isSetMessage()) {
+ oprot.writeString(struct.message);
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, LogEntry struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet incoming = iprot.readBitSet(2);
+ if (incoming.get(0)) {
+ struct.category = iprot.readString();
+ struct.setCategoryIsSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.message = iprot.readString();
+ struct.setMessageIsSet(true);
+ }
+ }
+ }
+
+ private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+ return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+ }
+}
+
diff --git a/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/generated/ResultCode.java b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/generated/ResultCode.java
new file mode 100644
index 0000000..e7f6554
--- /dev/null
+++ b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/generated/ResultCode.java
@@ -0,0 +1,43 @@
+/**
+ * Autogenerated by Thrift Compiler (0.12.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package zipkin2.collector.scribe.generated;
+
+
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.12.0)", date = "2019-05-07")
+public enum ResultCode implements org.apache.thrift.TEnum {
+ OK(0),
+ TRY_LATER(1);
+
+ private final int value;
+
+ private ResultCode(int value) {
+ this.value = value;
+ }
+
+ /**
+ * Get the integer value of this enum value, as defined in the Thrift IDL.
+ */
+ public int getValue() {
+ return value;
+ }
+
+ /**
+ * Find a the enum type by its integer value, as defined in the Thrift IDL.
+ * @return null if the value is not found.
+ */
+ @org.apache.thrift.annotation.Nullable
+ public static ResultCode findByValue(int value) {
+ switch (value) {
+ case 0:
+ return OK;
+ case 1:
+ return TRY_LATER;
+ default:
+ return null;
+ }
+ }
+}
diff --git a/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/generated/Scribe.java b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/generated/Scribe.java
new file mode 100644
index 0000000..56be78c
--- /dev/null
+++ b/zipkin-collector/scribe/src/main/java/zipkin2/collector/scribe/generated/Scribe.java
@@ -0,0 +1,1045 @@
+/**
+ * Autogenerated by Thrift Compiler (0.12.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ * @generated
+ */
+package zipkin2.collector.scribe.generated;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.12.0)", date = "2019-05-07")
+public class Scribe {
+
+ public interface Iface {
+
+ public ResultCode Log(java.util.List<LogEntry> messages) throws org.apache.thrift.TException;
+
+ }
+
+ public interface AsyncIface {
+
+ public void Log(java.util.List<LogEntry> messages, org.apache.thrift.async.AsyncMethodCallback<ResultCode> resultHandler) throws org.apache.thrift.TException;
+
+ }
+
+ public static class Client extends org.apache.thrift.TServiceClient implements Iface {
+ public static class Factory implements org.apache.thrift.TServiceClientFactory<Client> {
+ public Factory() {}
+ public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
+ return new Client(prot);
+ }
+ public Client getClient(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
+ return new Client(iprot, oprot);
+ }
+ }
+
+ public Client(org.apache.thrift.protocol.TProtocol prot)
+ {
+ super(prot, prot);
+ }
+
+ public Client(org.apache.thrift.protocol.TProtocol iprot, org.apache.thrift.protocol.TProtocol oprot) {
+ super(iprot, oprot);
+ }
+
+ public ResultCode Log(java.util.List<LogEntry> messages) throws org.apache.thrift.TException
+ {
+ send_Log(messages);
+ return recv_Log();
+ }
+
+ public void send_Log(java.util.List<LogEntry> messages) throws org.apache.thrift.TException
+ {
+ Log_args args = new Log_args();
+ args.setMessages(messages);
+ sendBase("Log", args);
+ }
+
+ public ResultCode recv_Log() throws org.apache.thrift.TException
+ {
+ Log_result result = new Log_result();
+ receiveBase(result, "Log");
+ if (result.isSetSuccess()) {
+ return result.success;
+ }
+ throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, "Log failed: unknown result");
+ }
+
+ }
+ public static class AsyncClient extends org.apache.thrift.async.TAsyncClient implements AsyncIface {
+ public static class Factory implements org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
+ private org.apache.thrift.async.TAsyncClientManager clientManager;
+ private org.apache.thrift.protocol.TProtocolFactory protocolFactory;
+ public Factory(org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
+ this.clientManager = clientManager;
+ this.protocolFactory = protocolFactory;
+ }
+ public AsyncClient getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
+ return new AsyncClient(protocolFactory, clientManager, transport);
+ }
+ }
+
+ public AsyncClient(org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, org.apache.thrift.transport.TNonblockingTransport transport) {
+ super(protocolFactory, clientManager, transport);
+ }
+
+ public void Log(java.util.List<LogEntry> messages, org.apache.thrift.async.AsyncMethodCallback<ResultCode> resultHandler) throws org.apache.thrift.TException {
+ checkReady();
+ Log_call method_call = new Log_call(messages, resultHandler, this, ___protocolFactory, ___transport);
+ this.___currentMethod = method_call;
+ ___manager.call(method_call);
+ }
+
+ public static class Log_call extends org.apache.thrift.async.TAsyncMethodCall<ResultCode> {
+ private java.util.List<LogEntry> messages;
+ public Log_call(java.util.List<LogEntry> messages, org.apache.thrift.async.AsyncMethodCallback<ResultCode> resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException {
+ super(client, protocolFactory, transport, resultHandler, false);
+ this.messages = messages;
+ }
+
+ public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException {
+ prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("Log", org.apache.thrift.protocol.TMessageType.CALL, 0));
+ Log_args args = new Log_args();
+ args.setMessages(messages);
+ args.write(prot);
+ prot.writeMessageEnd();
+ }
+
+ public ResultCode getResult() throws org.apache.thrift.TException {
+ if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+ throw new java.lang.IllegalStateException("Method call not finished!");
+ }
+ org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+ org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport);
+ return (new Client(prot)).recv_Log();
+ }
+ }
+
+ }
+
+ public static class Processor<I extends Iface> extends org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
+ private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(Processor.class.getName());
+ public Processor(I iface) {
+ super(iface, getProcessMap(new java.util.HashMap<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
+ }
+
+ protected Processor(I iface, java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
+ super(iface, getProcessMap(processMap));
+ }
+
+ private static <I extends Iface> java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> getProcessMap(java.util.Map<java.lang.String, org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>> processMap) {
+ processMap.put("Log", new Log());
+ return processMap;
+ }
+
+ public static class Log<I extends Iface> extends org.apache.thrift.ProcessFunction<I, Log_args> {
+ public Log() {
+ super("Log");
+ }
+
+ public Log_args getEmptyArgsInstance() {
+ return new Log_args();
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ @Override
+ protected boolean rethrowUnhandledExceptions() {
+ return false;
+ }
+
+ public Log_result getResult(I iface, Log_args args) throws org.apache.thrift.TException {
+ Log_result result = new Log_result();
+ result.success = iface.Log(args.messages);
+ return result;
+ }
+ }
+
+ }
+
+ public static class AsyncProcessor<I extends AsyncIface> extends org.apache.thrift.TBaseAsyncProcessor<I> {
+ private static final org.slf4j.Logger _LOGGER = org.slf4j.LoggerFactory.getLogger(AsyncProcessor.class.getName());
+ public AsyncProcessor(I iface) {
+ super(iface, getProcessMap(new java.util.HashMap<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>>()));
+ }
+
+ protected AsyncProcessor(I iface, java.util.Map<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {
+ super(iface, getProcessMap(processMap));
+ }
+
+ private static <I extends AsyncIface> java.util.Map<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase,?>> getProcessMap(java.util.Map<java.lang.String, org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, ?>> processMap) {
+ processMap.put("Log", new Log());
+ return processMap;
+ }
+
+ public static class Log<I extends AsyncIface> extends org.apache.thrift.AsyncProcessFunction<I, Log_args, ResultCode> {
+ public Log() {
+ super("Log");
+ }
+
+ public Log_args getEmptyArgsInstance() {
+ return new Log_args();
+ }
+
+ public org.apache.thrift.async.AsyncMethodCallback<ResultCode> getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) {
+ final org.apache.thrift.AsyncProcessFunction fcall = this;
+ return new org.apache.thrift.async.AsyncMethodCallback<ResultCode>() {
+ public void onComplete(ResultCode o) {
+ Log_result result = new Log_result();
+ result.success = o;
+ try {
+ fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+ } catch (org.apache.thrift.transport.TTransportException e) {
+ _LOGGER.error("TTransportException writing to internal frame buffer", e);
+ fb.close();
+ } catch (java.lang.Exception e) {
+ _LOGGER.error("Exception writing to internal frame buffer", e);
+ onError(e);
+ }
+ }
+ public void onError(java.lang.Exception e) {
+ byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+ org.apache.thrift.TSerializable msg;
+ Log_result result = new Log_result();
+ if (e instanceof org.apache.thrift.transport.TTransportException) {
+ _LOGGER.error("TTransportException inside handler", e);
+ fb.close();
+ return;
+ } else if (e instanceof org.apache.thrift.TApplicationException) {
+ _LOGGER.error("TApplicationException inside handler", e);
+ msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+ msg = (org.apache.thrift.TApplicationException)e;
+ } else {
+ _LOGGER.error("Exception inside handler", e);
+ msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+ msg = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage());
+ }
+ try {
+ fcall.sendResponse(fb,msg,msgType,seqid);
+ } catch (java.lang.Exception ex) {
+ _LOGGER.error("Exception writing to internal frame buffer", ex);
+ fb.close();
+ }
+ }
+ };
+ }
+
+ protected boolean isOneway() {
+ return false;
+ }
+
+ public void start(I iface, Log_args args, org.apache.thrift.async.AsyncMethodCallback<ResultCode> resultHandler) throws org.apache.thrift.TException {
+ iface.Log(args.messages,resultHandler);
+ }
+ }
+
+ }
+
+ public static class Log_args implements org.apache.thrift.TBase<Log_args, Log_args._Fields>, java.io.Serializable, Cloneable, Comparable<Log_args> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Log_args");
+
+ private static final org.apache.thrift.protocol.TField MESSAGES_FIELD_DESC = new org.apache.thrift.protocol.TField("messages", org.apache.thrift.protocol.TType.LIST, (short)1);
+
+ private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new Log_argsStandardSchemeFactory();
+ private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new Log_argsTupleSchemeFactory();
+
+ public @org.apache.thrift.annotation.Nullable java.util.List<LogEntry> messages; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ MESSAGES((short)1, "messages");
+
+ private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+ static {
+ for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ @org.apache.thrift.annotation.Nullable
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // MESSAGES
+ return MESSAGES;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ @org.apache.thrift.annotation.Nullable
+ public static _Fields findByName(java.lang.String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final java.lang.String _fieldName;
+
+ _Fields(short thriftId, java.lang.String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public java.lang.String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.MESSAGES, new org.apache.thrift.meta_data.FieldMetaData("messages", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, LogEntry.class))));
+ metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Log_args.class, metaDataMap);
+ }
+
+ public Log_args() {
+ }
+
+ public Log_args(
+ java.util.List<LogEntry> messages)
+ {
+ this();
+ this.messages = messages;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public Log_args(Log_args other) {
+ if (other.isSetMessages()) {
+ java.util.List<LogEntry> __this__messages = new java.util.ArrayList<LogEntry>(other.messages.size());
+ for (LogEntry other_element : other.messages) {
+ __this__messages.add(new LogEntry(other_element));
+ }
+ this.messages = __this__messages;
+ }
+ }
+
+ public Log_args deepCopy() {
+ return new Log_args(this);
+ }
+
+ @Override
+ public void clear() {
+ this.messages = null;
+ }
+
+ public int getMessagesSize() {
+ return (this.messages == null) ? 0 : this.messages.size();
+ }
+
+ @org.apache.thrift.annotation.Nullable
+ public java.util.Iterator<LogEntry> getMessagesIterator() {
+ return (this.messages == null) ? null : this.messages.iterator();
+ }
+
+ public void addToMessages(LogEntry elem) {
+ if (this.messages == null) {
+ this.messages = new java.util.ArrayList<LogEntry>();
+ }
+ this.messages.add(elem);
+ }
+
+ @org.apache.thrift.annotation.Nullable
+ public java.util.List<LogEntry> getMessages() {
+ return this.messages;
+ }
+
+ public Log_args setMessages(@org.apache.thrift.annotation.Nullable java.util.List<LogEntry> messages) {
+ this.messages = messages;
+ return this;
+ }
+
+ public void unsetMessages() {
+ this.messages = null;
+ }
+
+ /** Returns true if field messages is set (has been assigned a value) and false otherwise */
+ public boolean isSetMessages() {
+ return this.messages != null;
+ }
+
+ public void setMessagesIsSet(boolean value) {
+ if (!value) {
+ this.messages = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
+ switch (field) {
+ case MESSAGES:
+ if (value == null) {
+ unsetMessages();
+ } else {
+ setMessages((java.util.List<LogEntry>)value);
+ }
+ break;
+
+ }
+ }
+
+ @org.apache.thrift.annotation.Nullable
+ public java.lang.Object getFieldValue(_Fields field) {
+ switch (field) {
+ case MESSAGES:
+ return getMessages();
+
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new java.lang.IllegalArgumentException();
+ }
+
+ switch (field) {
+ case MESSAGES:
+ return isSetMessages();
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(java.lang.Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof Log_args)
+ return this.equals((Log_args)that);
+ return false;
+ }
+
+ public boolean equals(Log_args that) {
+ if (that == null)
+ return false;
+ if (this == that)
+ return true;
+
+ boolean this_present_messages = true && this.isSetMessages();
+ boolean that_present_messages = true && that.isSetMessages();
+ if (this_present_messages || that_present_messages) {
+ if (!(this_present_messages && that_present_messages))
+ return false;
+ if (!this.messages.equals(that.messages))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int hashCode = 1;
+
+ hashCode = hashCode * 8191 + ((isSetMessages()) ? 131071 : 524287);
+ if (isSetMessages())
+ hashCode = hashCode * 8191 + messages.hashCode();
+
+ return hashCode;
+ }
+
+ @Override
+ public int compareTo(Log_args other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = java.lang.Boolean.valueOf(isSetMessages()).compareTo(other.isSetMessages());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetMessages()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.messages, other.messages);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ @org.apache.thrift.annotation.Nullable
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ scheme(iprot).read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ scheme(oprot).write(oprot, this);
+ }
+
+ @Override
+ public java.lang.String toString() {
+ java.lang.StringBuilder sb = new java.lang.StringBuilder("Log_args(");
+ boolean first = true;
+
+ sb.append("messages:");
+ if (this.messages == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.messages);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class Log_argsStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+ public Log_argsStandardScheme getScheme() {
+ return new Log_argsStandardScheme();
+ }
+ }
+
+ private static class Log_argsStandardScheme extends org.apache.thrift.scheme.StandardScheme<Log_args> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, Log_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 1: // MESSAGES
+ if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+ {
+ org.apache.thrift.protocol.TList _list0 = iprot.readListBegin();
+ struct.messages = new java.util.ArrayList<LogEntry>(_list0.size);
+ @org.apache.thrift.annotation.Nullable LogEntry _elem1;
+ for (int _i2 = 0; _i2 < _list0.size; ++_i2)
+ {
+ _elem1 = new LogEntry();
+ _elem1.read(iprot);
+ struct.messages.add(_elem1);
+ }
+ iprot.readListEnd();
+ }
+ struct.setMessagesIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, Log_args struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.messages != null) {
+ oprot.writeFieldBegin(MESSAGES_FIELD_DESC);
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.messages.size()));
+ for (LogEntry _iter3 : struct.messages)
+ {
+ _iter3.write(oprot);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class Log_argsTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+ public Log_argsTupleScheme getScheme() {
+ return new Log_argsTupleScheme();
+ }
+ }
+
+ private static class Log_argsTupleScheme extends org.apache.thrift.scheme.TupleScheme<Log_args> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, Log_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet optionals = new java.util.BitSet();
+ if (struct.isSetMessages()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.isSetMessages()) {
+ {
+ oprot.writeI32(struct.messages.size());
+ for (LogEntry _iter4 : struct.messages)
+ {
+ _iter4.write(oprot);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, Log_args struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ {
+ org.apache.thrift.protocol.TList _list5 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+ struct.messages = new java.util.ArrayList<LogEntry>(_list5.size);
+ @org.apache.thrift.annotation.Nullable LogEntry _elem6;
+ for (int _i7 = 0; _i7 < _list5.size; ++_i7)
+ {
+ _elem6 = new LogEntry();
+ _elem6.read(iprot);
+ struct.messages.add(_elem6);
+ }
+ }
+ struct.setMessagesIsSet(true);
+ }
+ }
+ }
+
+ private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+ return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+ }
+ }
+
+ public static class Log_result implements org.apache.thrift.TBase<Log_result, Log_result._Fields>, java.io.Serializable, Cloneable, Comparable<Log_result> {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Log_result");
+
+ private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.I32, (short)0);
+
+ private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new Log_resultStandardSchemeFactory();
+ private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new Log_resultTupleSchemeFactory();
+
+ /**
+ *
+ * @see ResultCode
+ */
+ public @org.apache.thrift.annotation.Nullable ResultCode success; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ /**
+ *
+ * @see ResultCode
+ */
+ SUCCESS((short)0, "success");
+
+ private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
+
+ static {
+ for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ @org.apache.thrift.annotation.Nullable
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 0: // SUCCESS
+ return SUCCESS;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ @org.apache.thrift.annotation.Nullable
+ public static _Fields findByName(java.lang.String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final java.lang.String _fieldName;
+
+ _Fields(short thriftId, java.lang.String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public java.lang.String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, ResultCode.class)));
+ metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Log_result.class, metaDataMap);
+ }
+
+ public Log_result() {
+ }
+
+ public Log_result(
+ ResultCode success)
+ {
+ this();
+ this.success = success;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public Log_result(Log_result other) {
+ if (other.isSetSuccess()) {
+ this.success = other.success;
+ }
+ }
+
+ public Log_result deepCopy() {
+ return new Log_result(this);
+ }
+
+ @Override
+ public void clear() {
+ this.success = null;
+ }
+
+ /**
+ *
+ * @see ResultCode
+ */
+ @org.apache.thrift.annotation.Nullable
+ public ResultCode getSuccess() {
+ return this.success;
+ }
+
+ /**
+ *
+ * @see ResultCode
+ */
+ public Log_result setSuccess(@org.apache.thrift.annotation.Nullable ResultCode success) {
+ this.success = success;
+ return this;
+ }
+
+ public void unsetSuccess() {
+ this.success = null;
+ }
+
+ /** Returns true if field success is set (has been assigned a value) and false otherwise */
+ public boolean isSetSuccess() {
+ return this.success != null;
+ }
+
+ public void setSuccessIsSet(boolean value) {
+ if (!value) {
+ this.success = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
+ switch (field) {
+ case SUCCESS:
+ if (value == null) {
+ unsetSuccess();
+ } else {
+ setSuccess((ResultCode)value);
+ }
+ break;
+
+ }
+ }
+
+ @org.apache.thrift.annotation.Nullable
+ public java.lang.Object getFieldValue(_Fields field) {
+ switch (field) {
+ case SUCCESS:
+ return getSuccess();
+
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new java.lang.IllegalArgumentException();
+ }
+
+ switch (field) {
+ case SUCCESS:
+ return isSetSuccess();
+ }
+ throw new java.lang.IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(java.lang.Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof Log_result)
+ return this.equals((Log_result)that);
+ return false;
+ }
+
+ public boolean equals(Log_result that) {
+ if (that == null)
+ return false;
+ if (this == that)
+ return true;
+
+ boolean this_present_success = true && this.isSetSuccess();
+ boolean that_present_success = true && that.isSetSuccess();
+ if (this_present_success || that_present_success) {
+ if (!(this_present_success && that_present_success))
+ return false;
+ if (!this.success.equals(that.success))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int hashCode = 1;
+
+ hashCode = hashCode * 8191 + ((isSetSuccess()) ? 131071 : 524287);
+ if (isSetSuccess())
+ hashCode = hashCode * 8191 + success.getValue();
+
+ return hashCode;
+ }
+
+ @Override
+ public int compareTo(Log_result other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+
+ lastComparison = java.lang.Boolean.valueOf(isSetSuccess()).compareTo(other.isSetSuccess());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetSuccess()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, other.success);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ @org.apache.thrift.annotation.Nullable
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ scheme(iprot).read(iprot, this);
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ scheme(oprot).write(oprot, this);
+ }
+
+ @Override
+ public java.lang.String toString() {
+ java.lang.StringBuilder sb = new java.lang.StringBuilder("Log_result(");
+ boolean first = true;
+
+ sb.append("success:");
+ if (this.success == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.success);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ // check for sub-struct validity
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private static class Log_resultStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+ public Log_resultStandardScheme getScheme() {
+ return new Log_resultStandardScheme();
+ }
+ }
+
+ private static class Log_resultStandardScheme extends org.apache.thrift.scheme.StandardScheme<Log_result> {
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot, Log_result struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField schemeField;
+ iprot.readStructBegin();
+ while (true)
+ {
+ schemeField = iprot.readFieldBegin();
+ if (schemeField.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (schemeField.id) {
+ case 0: // SUCCESS
+ if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+ struct.success = ResultCode.findByValue(iprot.readI32());
+ struct.setSuccessIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+
+ // check for required fields of primitive type, which can't be checked in the validate method
+ struct.validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot, Log_result struct) throws org.apache.thrift.TException {
+ struct.validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (struct.success != null) {
+ oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+ oprot.writeI32(struct.success.getValue());
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ }
+
+ private static class Log_resultTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory {
+ public Log_resultTupleScheme getScheme() {
+ return new Log_resultTupleScheme();
+ }
+ }
+
+ private static class Log_resultTupleScheme extends org.apache.thrift.scheme.TupleScheme<Log_result> {
+
+ @Override
+ public void write(org.apache.thrift.protocol.TProtocol prot, Log_result struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet optionals = new java.util.BitSet();
+ if (struct.isSetSuccess()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.isSetSuccess()) {
+ oprot.writeI32(struct.success.getValue());
+ }
+ }
+
+ @Override
+ public void read(org.apache.thrift.protocol.TProtocol prot, Log_result struct) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
+ java.util.BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.success = ResultCode.findByValue(iprot.readI32());
+ struct.setSuccessIsSet(true);
+ }
+ }
+ }
+
+ private static <S extends org.apache.thrift.scheme.IScheme> S scheme(org.apache.thrift.protocol.TProtocol proto) {
+ return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme();
+ }
+ }
+
+}
diff --git a/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ITScribeCollector.java b/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ITScribeCollector.java
new file mode 100644
index 0000000..8274cc1
--- /dev/null
+++ b/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ITScribeCollector.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package zipkin2.collector.scribe;
+
+import java.util.Base64;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import zipkin2.Callback;
+import zipkin2.Span;
+import zipkin2.TestObjects;
+import zipkin2.codec.SpanBytesEncoder;
+import zipkin2.collector.Collector;
+import zipkin2.collector.CollectorMetrics;
+import zipkin2.collector.scribe.generated.LogEntry;
+import zipkin2.collector.scribe.generated.ResultCode;
+import zipkin2.collector.scribe.generated.Scribe;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class ITScribeCollector {
+
+ private static Collector collector;
+ private static CollectorMetrics metrics;
+
+ private static NettyScribeServer server;
+
+ @BeforeClass
+ public static void startServer() {
+ collector = mock(Collector.class);
+ doAnswer(invocation -> {
+ Callback<Void> callback = invocation.getArgument(1);
+ callback.onSuccess(null);
+ return null;
+ }).when(collector).accept(any(), any());
+
+ metrics = mock(CollectorMetrics.class);
+
+ server = new NettyScribeServer(0, new ScribeSpanConsumer(collector, metrics, "zipkin"));
+ server.start();
+ }
+
+ @AfterClass
+ public static void stopServer() {
+ server.close();
+ }
+
+ @Test
+ public void normal() throws Exception {
+ // Java version of this sample code
+ // https://github.com/facebookarchive/scribe/wiki/Logging-Messages
+ TTransport transport = new TFramedTransport(new TSocket("localhost", server.port()));
+ TProtocol protocol = new TBinaryProtocol(transport, false, false);
+ Scribe.Iface client = new Scribe.Client(protocol);
+
+ List<LogEntry> entries = TestObjects.TRACE.stream()
+ .map(ITScribeCollector::logEntry)
+ .collect(Collectors.toList());
+
+ transport.open();
+ try {
+ ResultCode code = client.Log(entries);
+ assertThat(code).isEqualTo(ResultCode.OK);
+
+ code = client.Log(entries);
+ assertThat(code).isEqualTo(ResultCode.OK);
+ } finally {
+ transport.close();
+ }
+
+ verify(collector, times(2)).accept(eq(TestObjects.TRACE), any());
+ verify(metrics, times(2)).incrementMessages();
+ }
+
+ private static LogEntry logEntry(Span span) {
+ return new LogEntry()
+ .setCategory("zipkin")
+ .setMessage(Base64.getMimeEncoder().encodeToString(SpanBytesEncoder.THRIFT.encode(span)));
+ }
+}
diff --git a/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeCollectorTest.java b/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeCollectorTest.java
index 26e085f..ca53997 100644
--- a/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeCollectorTest.java
+++ b/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeCollectorTest.java
@@ -16,7 +16,6 @@
*/
package zipkin2.collector.scribe;
-import org.jboss.netty.channel.ChannelException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -45,8 +44,8 @@ public class ScribeCollectorTest {
@Test
public void start_failsWhenCantBindPort() {
- thrown.expect(ChannelException.class);
- thrown.expectMessage("Failed to bind to: 0.0.0.0/0.0.0.0:12345");
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("Could not start scribe server.");
ScribeCollector.Builder builder = ScribeCollector.newBuilder().storage(storage).port(12345);
diff --git a/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeSpanConsumerTest.java b/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeSpanConsumerTest.java
index 567b87f..fdea0bc 100644
--- a/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeSpanConsumerTest.java
+++ b/zipkin-collector/scribe/src/test/java/zipkin2/collector/scribe/ScribeSpanConsumerTest.java
@@ -18,7 +18,7 @@ package zipkin2.collector.scribe;
import java.util.Arrays;
import java.util.Base64;
-import java.util.concurrent.ExecutionException;
+import org.apache.thrift.async.AsyncMethodCallback;
import org.junit.Test;
import zipkin2.Call;
import zipkin2.Callback;
@@ -27,6 +27,8 @@ import zipkin2.Endpoint;
import zipkin2.Span;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.collector.InMemoryCollectorMetrics;
+import zipkin2.collector.scribe.generated.LogEntry;
+import zipkin2.collector.scribe.generated.ResultCode;
import zipkin2.storage.InMemoryStorage;
import zipkin2.storage.SpanConsumer;
import zipkin2.storage.SpanStore;
@@ -34,10 +36,9 @@ import zipkin2.storage.StorageComponent;
import zipkin2.v1.V1Span;
import zipkin2.v1.V1SpanConverter;
-import static com.google.common.base.Charsets.UTF_8;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.asList;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
public class ScribeSpanConsumerTest {
// scope to scribe as we aren't creating the consumer with the builder.
@@ -46,6 +47,20 @@ public class ScribeSpanConsumerTest {
InMemoryStorage storage = InMemoryStorage.newBuilder().build();
SpanConsumer consumer = storage.spanConsumer();
+ static class CaptureAsyncMethodCallback implements AsyncMethodCallback<ResultCode> {
+
+ ResultCode resultCode;
+ Exception error;
+
+ @Override public void onComplete(ResultCode resultCode) {
+ this.resultCode = resultCode;
+ }
+
+ @Override public void onError(Exception error) {
+ this.error = error;
+ }
+ }
+
static String reallyLongAnnotation;
static {
@@ -85,11 +100,11 @@ public class ScribeSpanConsumerTest {
public void entriesWithSpansAreConsumed() throws Exception {
ScribeSpanConsumer scribe = newScribeSpanConsumer("zipkin", consumer);
- Scribe.LogEntry entry = new Scribe.LogEntry();
+ LogEntry entry = new LogEntry();
entry.category = "zipkin";
entry.message = encodedSpan;
- assertThat(scribe.log(asList(entry)).get()).isEqualTo(Scribe.ResultCode.OK);
+ expectSuccess(scribe, entry);
assertThat(storage.getTraces()).containsExactly(asList(v2));
@@ -108,11 +123,11 @@ public class ScribeSpanConsumerTest {
ScribeSpanConsumer scribe = newScribeSpanConsumer("zipkin", consumer);
- Scribe.LogEntry entry = new Scribe.LogEntry();
+ LogEntry entry = new LogEntry();
entry.category = "notzipkin";
entry.message = "hello world";
- scribe.log(asList(entry)).get();
+ expectSuccess(scribe, entry);
assertThat(scribeMetrics.messages()).isEqualTo(1);
assertThat(scribeMetrics.messagesDropped()).isZero();
@@ -121,20 +136,23 @@ public class ScribeSpanConsumerTest {
assertThat(scribeMetrics.spansDropped()).isZero();
}
+ private void expectSuccess(ScribeSpanConsumer scribe, LogEntry entry) {
+ CaptureAsyncMethodCallback callback = new CaptureAsyncMethodCallback();
+ scribe.Log(asList(entry), callback);
+ assertThat(callback.resultCode).isEqualTo(ResultCode.OK);
+ }
+
@Test
public void malformedDataIsDropped() throws Exception {
ScribeSpanConsumer scribe = newScribeSpanConsumer("zipkin", consumer);
- Scribe.LogEntry entry = new Scribe.LogEntry();
+ LogEntry entry = new LogEntry();
entry.category = "zipkin";
entry.message = "notbase64";
- try {
- scribe.log(asList(entry)).get();
- failBecauseExceptionWasNotThrown(ExecutionException.class);
- } catch (ExecutionException e) {
- assertThat(e.getCause()).isInstanceOf(IllegalArgumentException.class);
- }
+ CaptureAsyncMethodCallback callback = new CaptureAsyncMethodCallback();
+ scribe.Log(asList(entry), callback);
+ assertThat(callback.error).isInstanceOf(IllegalArgumentException.class);
assertThat(scribeMetrics.messages()).isEqualTo(1);
assertThat(scribeMetrics.messagesDropped()).isEqualTo(1);
@@ -151,16 +169,13 @@ public class ScribeSpanConsumerTest {
ScribeSpanConsumer scribe = newScribeSpanConsumer("zipkin", consumer);
- Scribe.LogEntry entry = new Scribe.LogEntry();
+ LogEntry entry = new LogEntry();
entry.category = "zipkin";
entry.message = encodedSpan;
- try {
- scribe.log(asList(entry)).get();
- failBecauseExceptionWasNotThrown(ExecutionException.class);
- } catch (ExecutionException e) {
- assertThat(e.getCause()).hasMessage("endpoint was null");
- }
+ CaptureAsyncMethodCallback callback = new CaptureAsyncMethodCallback();
+ scribe.Log(asList(entry), callback);
+ assertThat(callback.error).hasMessage("endpoint was null");
assertThat(scribeMetrics.messages()).isEqualTo(1);
assertThat(scribeMetrics.messagesDropped()).isZero();
@@ -192,11 +207,11 @@ public class ScribeSpanConsumerTest {
ScribeSpanConsumer scribe = newScribeSpanConsumer("zipkin", consumer);
- Scribe.LogEntry entry = new Scribe.LogEntry();
+ LogEntry entry = new LogEntry();
entry.category = "zipkin";
entry.message = encodedSpan;
- scribe.log(asList(entry)).get();
+ expectSuccess(scribe, entry);
assertThat(scribeMetrics.messages()).isEqualTo(1);
assertThat(scribeMetrics.messagesDropped()).isZero();
@@ -208,13 +223,15 @@ public class ScribeSpanConsumerTest {
/** Finagle's zipkin tracer breaks on a column width with a trailing newline */
@Test
public void decodesSpanGeneratedByFinagle() throws Exception {
- Scribe.LogEntry entry = new Scribe.LogEntry();
+ LogEntry entry = new LogEntry();
entry.category = "zipkin";
entry.message =
"CgABq/sBMnzE048LAAMAAAAOZ2V0VHJhY2VzQnlJZHMKAATN0p+4EGfTdAoABav7ATJ8xNOPDwAGDAAAAAQKAAEABR/wq+2DeAsAAgAAAAJzcgwAAwgAAX8AAAEGAAIkwwsAAwAAAAx6aXBraW4tcXVlcnkAAAoAAQAFH/Cr7zj4CwACAAAIAGFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWF [...]
+ "YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhY [...]
- newScribeSpanConsumer(entry.category, consumer).log(asList(entry)).get();
+ ScribeSpanConsumer scribe = newScribeSpanConsumer(entry.category, consumer);
+
+ expectSuccess(scribe, entry);
assertThat(storage.getTraces()).containsExactly(asList(v2));
@@ -227,26 +244,29 @@ public class ScribeSpanConsumerTest {
}
ScribeSpanConsumer newScribeSpanConsumer(String category, SpanConsumer consumer) {
+ ScribeCollector.Builder builder = ScribeCollector.newBuilder()
+ .category(category)
+ .metrics(scribeMetrics)
+ .storage(new StorageComponent() {
+ @Override public SpanStore spanStore() {
+ throw new AssertionError();
+ }
+
+ @Override public SpanConsumer spanConsumer() {
+ return consumer;
+ }
+
+ @Override public CheckResult check() {
+ return CheckResult.OK;
+ }
+
+ @Override public void close() {
+ throw new AssertionError();
+ }
+ });
return new ScribeSpanConsumer(
- ScribeCollector.newBuilder()
- .category(category)
- .metrics(scribeMetrics)
- .storage(new StorageComponent() {
- @Override public SpanStore spanStore() {
- throw new AssertionError();
- }
-
- @Override public SpanConsumer spanConsumer() {
- return consumer;
- }
-
- @Override public CheckResult check() {
- return CheckResult.OK;
- }
-
- @Override public void close() {
- throw new AssertionError();
- }
- }));
+ builder.delegate.build(),
+ builder.metrics,
+ builder.category);
}
}