You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/23 10:10:48 UTC
[50/50] [abbrv] hadoop git commit: HDFS-8515. Implement HTTP/2 stream
channels. Contributed by Duo Zhang.
HDFS-8515. Implement HTTP/2 stream channels. Contributed by Duo Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d7e4ad4a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d7e4ad4a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d7e4ad4a
Branch: refs/heads/HDFS-7966
Commit: d7e4ad4ad00b2026171506fba11e1f5d7b51c6cb
Parents: cc2b473
Author: Haohui Mai <wh...@apache.org>
Authored: Wed Jun 24 18:07:17 2015 -0700
Committer: zhangduo <zh...@wandoujia.com>
Committed: Wed Sep 23 11:06:41 2015 +0800
----------------------------------------------------------------------
.gitignore | 2 +
.../hadoop-hdfs/CHANGES-HDFS-7966.txt | 3 +
hadoop-hdfs-project/hadoop-hdfs/pom.xml | 5 +
.../web/PortUnificationServerHandler.java | 27 +-
.../datanode/web/dtp/DtpChannelHandler.java | 47 ++++
.../datanode/web/dtp/DtpHttp2FrameListener.java | 52 ----
.../datanode/web/dtp/DtpHttp2Handler.java | 34 ---
.../hdfs/web/http2/Http2StreamChannel.java | 268 +++++++++++++++++++
.../hadoop/hdfs/web/http2/LastHttp2Message.java | 44 +++
.../web/http2/ServerHttp2ConnectionHandler.java | 86 ++++++
.../web/http2/ServerHttp2EventListener.java | 135 ++++++++++
.../datanode/web/dtp/Http2ResponseHandler.java | 14 +-
.../server/datanode/web/dtp/TestDtpHttp2.java | 6 +-
.../hdfs/web/http2/AbstractTestHttp2Server.java | 67 +++++
.../hadoop/hdfs/web/http2/StreamListener.java | 116 ++++++++
.../hadoop/hdfs/web/http2/TestHttp2Server.java | 140 ++++++++++
.../web/http2/TestHttp2ServerMultiThread.java | 207 ++++++++++++++
hadoop-project/pom.xml | 7 +
18 files changed, 1154 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index cde198e..0d664ba 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,3 +25,5 @@ yarnregistry.pdf
hadoop-tools/hadoop-aws/src/test/resources/auth-keys.xml
hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml
patchprocess/
+hadoop-hdfs-project/hadoop-hdfs/src/test/resources/common-version-info.properties
+hadoop-hdfs-project/hadoop-hdfs/src/test/resources/webapps
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7966.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7966.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7966.txt
new file mode 100644
index 0000000..4ec2793
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-7966.txt
@@ -0,0 +1,3 @@
+HDFS-7966 AND RELATED CHANGES
+
+ HDFS-8515. Implement HTTP/2 stream channels. (Duo Zhang via wheat9)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index d0c2dc7..a20bf1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -196,6 +196,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<artifactId>htrace-core</artifactId>
</dependency>
<dependency>
+ <groupId>org.eclipse.jetty.http2</groupId>
+ <artifactId>http2-client</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-kms</artifactId>
<classifier>classes</classifier>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java
index 7ebc070..e5c5256 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/PortUnificationServerHandler.java
@@ -17,21 +17,24 @@
*/
package org.apache.hadoop.hdfs.server.datanode.web;
-import java.net.InetSocketAddress;
-import java.util.List;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.datanode.web.dtp.DtpHttp2Handler;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.stream.ChunkedWriteHandler;
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.datanode.web.dtp.DtpChannelHandler;
+import org.apache.hadoop.hdfs.web.http2.Http2StreamChannel;
+import org.apache.hadoop.hdfs.web.http2.ServerHttp2ConnectionHandler;
+
/**
* A port unification handler to support HTTP/1.1 and HTTP/2 on the same port.
*/
@@ -64,7 +67,15 @@ public class PortUnificationServerHandler extends ByteToMessageDecoder {
}
private void configureHttp2(ChannelHandlerContext ctx) {
- ctx.pipeline().addLast(new DtpHttp2Handler());
+ ctx.pipeline().addLast(
+ ServerHttp2ConnectionHandler.create(ctx.channel(),
+ new ChannelInitializer<Http2StreamChannel>() {
+
+ @Override
+ protected void initChannel(Http2StreamChannel ch) throws Exception {
+ ch.pipeline().addLast(new DtpChannelHandler());
+ }
+ }));
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpChannelHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpChannelHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpChannelHandler.java
new file mode 100644
index 0000000..23847c1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpChannelHandler.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.web.dtp;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.Http2Headers;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.web.http2.LastHttp2Message;
+
+/**
+ * A dummy handler that just write back a string message.
+ */
+@InterfaceAudience.Private
+public class DtpChannelHandler extends
+ SimpleChannelInboundHandler<Http2Headers> {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Http2Headers msg)
+ throws Exception {
+ ctx.write(new DefaultHttp2Headers().status(HttpResponseStatus.OK
+ .codeAsText()));
+ ctx.write(ctx.alloc().buffer()
+ .writeBytes("HTTP/2 DTP".getBytes(StandardCharsets.UTF_8)));
+ ctx.writeAndFlush(LastHttp2Message.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java
deleted file mode 100644
index 41e7cf4..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2FrameListener.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode.web.dtp;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http2.DefaultHttp2Headers;
-import io.netty.handler.codec.http2.Http2ConnectionEncoder;
-import io.netty.handler.codec.http2.Http2Exception;
-import io.netty.handler.codec.http2.Http2FrameAdapter;
-import io.netty.handler.codec.http2.Http2Headers;
-
-import java.nio.charset.StandardCharsets;
-
-class DtpHttp2FrameListener extends Http2FrameAdapter {
-
- private Http2ConnectionEncoder encoder;
-
- public void encoder(Http2ConnectionEncoder encoder) {
- this.encoder = encoder;
- }
-
- @Override
- public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
- Http2Headers headers, int streamDependency, short weight,
- boolean exclusive, int padding, boolean endStream) throws Http2Exception {
- encoder.writeHeaders(ctx, streamId,
- new DefaultHttp2Headers().status(HttpResponseStatus.OK.codeAsText()), 0,
- false, ctx.newPromise());
- encoder.writeData(
- ctx,
- streamId,
- ctx.alloc().buffer()
- .writeBytes("HTTP/2 DTP".getBytes(StandardCharsets.UTF_8)), 0, true,
- ctx.newPromise());
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java
deleted file mode 100644
index 5b6f279..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/DtpHttp2Handler.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode.web.dtp;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-import io.netty.handler.codec.http2.Http2ConnectionHandler;
-
-/**
- * The HTTP/2 handler.
- */
-@InterfaceAudience.Private
-public class DtpHttp2Handler extends Http2ConnectionHandler {
-
- public DtpHttp2Handler() {
- super(true, new DtpHttp2FrameListener());
- ((DtpHttp2FrameListener) decoder().listener()).encoder(encoder());
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java
new file mode 100644
index 0000000..658ffe4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/Http2StreamChannel.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.AbstractChannel;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelMetadata;
+import io.netty.channel.ChannelOutboundBuffer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelConfig;
+import io.netty.channel.EventLoop;
+import io.netty.handler.codec.UnsupportedMessageTypeException;
+import io.netty.handler.codec.http2.Http2ConnectionEncoder;
+import io.netty.handler.codec.http2.Http2ConnectionHandler;
+import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.InternalThreadLocalMap;
+
+import java.net.SocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * A channel used for modeling an HTTP/2 stream.
+ * <p>
+ * We share the same event loop with the parent channel, so doBeginRead, doWrite
+ * and doClose will run in the same event loop thread. So no event loop
+ * switching is needed, and it is safe to call encoder.writeXXX directly in
+ * doWrite.
+ * <p>
+ * But the public methods(isOpen, isActive...) can be called outside the event
+ * loop, so the state field must be volatile.
+ */
+@InterfaceAudience.Private
+public class Http2StreamChannel extends AbstractChannel {
+
+ private static final ChannelMetadata METADATA = new ChannelMetadata(false);
+
+ private static final int MAX_READER_STACK_DEPTH = 8;
+
+ private final ChannelHandlerContext http2ConnHandlerCtx;
+ private final Http2Stream stream;
+ private final Http2ConnectionEncoder encoder;
+ private final DefaultChannelConfig config;
+ private final Queue<Object> inboundMessageQueue = new ArrayDeque<>();
+
+ private enum State {
+ OPEN, HALF_CLOSED_LOCAL, HALF_CLOSED_REMOTE, PRE_CLOSED, CLOSED
+ }
+
+ private volatile State state = State.OPEN;
+
+ public Http2StreamChannel(Channel parent, Http2Stream stream) {
+ super(parent);
+ this.http2ConnHandlerCtx =
+ parent.pipeline().context(Http2ConnectionHandler.class);
+ Http2ConnectionHandler connHandler =
+ (Http2ConnectionHandler) http2ConnHandlerCtx.handler();
+ this.stream = stream;
+ this.encoder = connHandler.encoder();
+ this.config = new DefaultChannelConfig(this);
+ }
+
+ @Override
+ public ChannelConfig config() {
+ return config;
+ }
+
+ @Override
+ public boolean isOpen() {
+ return state != State.CLOSED;
+ }
+
+ @Override
+ public boolean isActive() {
+ // we create this channel after HTTP/2 stream active, so we do not have a
+ // separated 'active' state.
+ return isOpen();
+ }
+
+ @Override
+ public ChannelMetadata metadata() {
+ return METADATA;
+ }
+
+ private final class Http2Unsafe extends AbstractUnsafe {
+
+ @Override
+ public void connect(SocketAddress remoteAddress,
+ SocketAddress localAddress, ChannelPromise promise) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Override
+ protected AbstractUnsafe newUnsafe() {
+ return new Http2Unsafe();
+ }
+
+ @Override
+ protected boolean isCompatible(EventLoop loop) {
+ return true;
+ }
+
+ @Override
+ protected SocketAddress localAddress0() {
+ return parent().localAddress();
+ }
+
+ @Override
+ protected SocketAddress remoteAddress0() {
+ return parent().remoteAddress();
+ }
+
+ @Override
+ protected void doBind(SocketAddress localAddress) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void doDisconnect() throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void doClose() throws Exception {
+ if (stream.state() != Http2Stream.State.CLOSED) {
+ encoder.writeRstStream(http2ConnHandlerCtx, stream.id(),
+ Http2Error.INTERNAL_ERROR.code(), http2ConnHandlerCtx.newPromise());
+ }
+ state = State.CLOSED;
+ }
+
+ private final Runnable readTask = new Runnable() {
+
+ @Override
+ public void run() {
+ ChannelPipeline pipeline = pipeline();
+ int maxMessagesPerRead = config().getMaxMessagesPerRead();
+ for (int i = 0; i < maxMessagesPerRead; i++) {
+ Object m = inboundMessageQueue.poll();
+ if (m == null) {
+ break;
+ }
+ if (m == LastHttp2Message.get()) {
+ state =
+ state == State.HALF_CLOSED_LOCAL ? State.PRE_CLOSED
+ : State.HALF_CLOSED_REMOTE;
+ }
+ pipeline.fireChannelRead(m);
+ }
+ pipeline.fireChannelReadComplete();
+ }
+ };
+
+ @Override
+ protected void doBeginRead() throws Exception {
+ State currentState = this.state;
+ if (currentState == State.CLOSED) {
+ throw new ClosedChannelException();
+ }
+ if (inboundMessageQueue.isEmpty()) {
+ return;
+ }
+ final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
+ final Integer stackDepth = threadLocals.localChannelReaderStackDepth();
+ if (stackDepth < MAX_READER_STACK_DEPTH) {
+ threadLocals.setLocalChannelReaderStackDepth(stackDepth + 1);
+ try {
+ readTask.run();
+ } finally {
+ threadLocals.setLocalChannelReaderStackDepth(stackDepth);
+ }
+ } else {
+ eventLoop().execute(readTask);
+ }
+ }
+
+ @Override
+ protected void doWrite(ChannelOutboundBuffer in) throws Exception {
+ State currentState = this.state;
+ if (currentState == State.CLOSED) {
+ throw new ClosedChannelException();
+ }
+ boolean flush = false;
+ for (;;) {
+ Object msg = in.current();
+ if (msg == null) {
+ break;
+ }
+ if (msg == LastHttp2Message.get()) {
+ this.state =
+ currentState == State.HALF_CLOSED_REMOTE ? State.PRE_CLOSED
+ : State.HALF_CLOSED_LOCAL;
+ encoder.writeData(http2ConnHandlerCtx, stream.id(), http2ConnHandlerCtx
+ .alloc().buffer(0), 0, true, http2ConnHandlerCtx.newPromise());
+ } else if (msg instanceof Http2Headers) {
+ encoder.writeHeaders(http2ConnHandlerCtx, stream.id(),
+ (Http2Headers) msg, 0, false, http2ConnHandlerCtx.newPromise());
+ } else if (msg instanceof ByteBuf) {
+ ByteBuf data = (ByteBuf) msg;
+ encoder.writeData(http2ConnHandlerCtx, stream.id(), data.retain(), 0,
+ false, http2ConnHandlerCtx.newPromise());
+ } else {
+ throw new UnsupportedMessageTypeException(msg, Http2Headers.class,
+ ByteBuf.class);
+ }
+ in.remove();
+ flush = true;
+ }
+ if (flush) {
+ http2ConnHandlerCtx.channel().flush();
+ }
+ }
+
+ /**
+ * Append a message to the inbound queue of this channel. You need to call
+ * {@link #read()} if you want to pass the message to handlers.
+ */
+ void writeInbound(Object msg) {
+ inboundMessageQueue.add(msg);
+ }
+
+ private static final ImmutableSet<State> REMOTE_SIDE_CLOSED_STATES =
+ ImmutableSet.of(State.HALF_CLOSED_REMOTE, State.PRE_CLOSED, State.CLOSED);
+
+ /**
+ * @return true if remote side finishes sending data to us.
+ */
+ public boolean remoteSideClosed() {
+ return REMOTE_SIDE_CLOSED_STATES.contains(state);
+ }
+
+ private static final ImmutableSet<State> LOCAL_SIDE_CLOSED_STATES =
+ ImmutableSet.of(State.HALF_CLOSED_LOCAL, State.PRE_CLOSED, State.CLOSED);
+
+ /**
+ * @return true if we finish sending data to remote side.
+ */
+ public boolean localSideClosed() {
+ return LOCAL_SIDE_CLOSED_STATES.contains(state);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/LastHttp2Message.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/LastHttp2Message.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/LastHttp2Message.java
new file mode 100644
index 0000000..b72b09a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/LastHttp2Message.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.handler.codec.http.LastHttpContent;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Used to tell an inbound handler that the remote side of an HTTP/2 stream is
+ * closed, or used by an outbound handler to tell the HTTP/2 stream to close
+ * local side.
+ * @see LastHttpContent#EMPTY_LAST_CONTENT
+ */
+@InterfaceAudience.Private
+public final class LastHttp2Message {
+
+ private static final LastHttp2Message INSTANCE = new LastHttp2Message();
+
+ private LastHttp2Message() {
+ }
+
+ /**
+ * Get the singleton <tt>LastHttp2Message</tt> instance.
+ */
+ public static LastHttp2Message get() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java
new file mode 100644
index 0000000..1ee733d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2ConnectionHandler.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelInitializer;
+import io.netty.handler.codec.http2.DefaultHttp2Connection;
+import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
+import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2ConnectionHandler;
+import io.netty.handler.codec.http2.Http2FrameListener;
+import io.netty.handler.codec.http2.Http2FrameLogger;
+import io.netty.handler.codec.http2.Http2FrameReader;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2InboundFrameLogger;
+import io.netty.handler.codec.http2.Http2OutboundFrameLogger;
+import io.netty.handler.logging.LogLevel;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * An {@link Http2ConnectionHandler} used at server side.
+ */
+@InterfaceAudience.Private
+public class ServerHttp2ConnectionHandler extends Http2ConnectionHandler {
+
+ private static final Log LOG = LogFactory
+ .getLog(ServerHttp2ConnectionHandler.class);
+
+ private static final Http2FrameLogger FRAME_LOGGER = new Http2FrameLogger(
+ LogLevel.INFO, ServerHttp2ConnectionHandler.class);
+
+ private ServerHttp2ConnectionHandler(Http2Connection connection,
+ Http2FrameReader frameReader, Http2FrameWriter frameWriter,
+ Http2FrameListener listener) {
+ super(connection, frameReader, frameWriter, listener);
+ }
+
+ /**
+ * Create and initialize an {@link ServerHttp2ConnectionHandler}.
+ * @param channel
+ * @param initializer
+ * @param verbose whether to log inbound and outbound HTTP/2 messages
+ * @return the initialized {@link ServerHttp2ConnectionHandler}
+ */
+ public static ServerHttp2ConnectionHandler create(Channel channel,
+ ChannelInitializer<Http2StreamChannel> initializer) {
+ Http2Connection conn = new DefaultHttp2Connection(true);
+ ServerHttp2EventListener listener =
+ new ServerHttp2EventListener(channel, conn, initializer);
+ conn.addListener(listener);
+ Http2FrameReader frameReader;
+ Http2FrameWriter frameWriter;
+ if (LOG.isDebugEnabled()) {
+ frameReader =
+ new Http2InboundFrameLogger(new DefaultHttp2FrameReader(),
+ FRAME_LOGGER);
+ frameWriter =
+ new Http2OutboundFrameLogger(new DefaultHttp2FrameWriter(),
+ FRAME_LOGGER);
+ } else {
+ frameReader = new DefaultHttp2FrameReader();
+ frameWriter = new DefaultHttp2FrameWriter();
+ }
+ return new ServerHttp2ConnectionHandler(conn, frameReader, frameWriter,
+ listener);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java
new file mode 100644
index 0000000..72e3879
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/http2/ServerHttp2EventListener.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Connection.PropertyKey;
+import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2EventAdapter;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.FutureListener;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * An HTTP/2 FrameListener and EventListener to manage
+ * {@link Http2StreamChannel}s.
+ * <p>
+ * We do not handle onRstStreamRead here, a stream that being reset will also
+ * call onStreamClosed. The upper layer should not rely on a reset event.
+ */
+@InterfaceAudience.Private
+public class ServerHttp2EventListener extends Http2EventAdapter {
+
+ private final Channel parentChannel;
+
+ private final ChannelInitializer<Http2StreamChannel> subChannelInitializer;
+
+ private final Http2Connection conn;
+
+ private final PropertyKey subChannelPropKey;
+
+ public ServerHttp2EventListener(Channel parentChannel, Http2Connection conn,
+ ChannelInitializer<Http2StreamChannel> subChannelInitializer) {
+ this.parentChannel = parentChannel;
+ this.conn = conn;
+ this.subChannelInitializer = subChannelInitializer;
+ this.subChannelPropKey = conn.newKey();
+ }
+
+ @Override
+ public void onStreamActive(final Http2Stream stream) {
+ Http2StreamChannel subChannel =
+ new Http2StreamChannel(parentChannel, stream);
+ stream.setProperty(subChannelPropKey, subChannel);
+ subChannel.pipeline().addFirst(subChannelInitializer);
+ parentChannel.eventLoop().register(subChannel)
+ .addListener(new FutureListener<Void>() {
+
+ @Override
+ public void operationComplete(Future<Void> future) throws Exception {
+ if (!future.isSuccess()) {
+ stream.removeProperty(subChannelPropKey);
+ }
+ }
+
+ });
+
+ }
+
+ @Override
+ public void onStreamClosed(Http2Stream stream) {
+ Http2StreamChannel subChannel = stream.removeProperty(subChannelPropKey);
+ if (subChannel != null) {
+ subChannel.close();
+ }
+ }
+
+ private Http2StreamChannel getSubChannel(int streamId) throws Http2Exception {
+ Http2StreamChannel subChannel =
+ conn.stream(streamId).getProperty(subChannelPropKey);
+ if (subChannel == null) {
+ throw Http2Exception.streamError(streamId, Http2Error.INTERNAL_ERROR,
+ "No sub channel found");
+ }
+ return subChannel;
+ }
+
+ private void writeInbound(int streamId, Object msg, boolean endOfStream)
+ throws Http2Exception {
+ Http2StreamChannel subChannel = getSubChannel(streamId);
+ subChannel.writeInbound(msg);
+ if (endOfStream) {
+ subChannel.writeInbound(LastHttp2Message.get());
+ }
+ if (subChannel.config().isAutoRead()) {
+ subChannel.read();
+ }
+
+ }
+
+ @Override
+ public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
+ Http2Headers headers, int padding, boolean endOfStream)
+ throws Http2Exception {
+ writeInbound(streamId, headers, endOfStream);
+ }
+
+ @Override
+ public void onHeadersRead(ChannelHandlerContext ctx, int streamId,
+ Http2Headers headers, int streamDependency, short weight,
+ boolean exclusive, int padding, boolean endOfStream)
+ throws Http2Exception {
+ onHeadersRead(ctx, streamId, headers, padding, endOfStream);
+ }
+
+ @Override
+ public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data,
+ int padding, boolean endOfStream) throws Http2Exception {
+ int pendingBytes = data.readableBytes() + padding;
+ writeInbound(streamId, data.retain(), endOfStream);
+ return pendingBytes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java
index eb8b918..1e1acdd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/Http2ResponseHandler.java
@@ -23,14 +23,15 @@ import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http2.HttpUtil;
import io.netty.util.concurrent.Promise;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
+import net.sf.ehcache.store.chm.ConcurrentHashMap;
public class Http2ResponseHandler extends
SimpleChannelInboundHandler<FullHttpResponse> {
- private Map<Integer, Promise<FullHttpResponse>> streamId2Promise =
- new HashMap<>();
+ private ConcurrentMap<Integer, Promise<FullHttpResponse>> streamId2Promise =
+ new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg)
@@ -46,10 +47,7 @@ public class Http2ResponseHandler extends
// this is the upgrade response message, just ignore it.
return;
}
- Promise<FullHttpResponse> promise;
- synchronized (this) {
- promise = streamId2Promise.get(streamId);
- }
+ Promise<FullHttpResponse> promise = streamId2Promise.get(streamId);
if (promise == null) {
System.err.println("Message received for unknown stream id " + streamId);
} else {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java
index 4e91004..eaa63a4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/web/dtp/TestDtpHttp2.java
@@ -136,10 +136,8 @@ public class TestDtpHttp2 {
request.headers().add(HttpUtil.ExtensionHeaderNames.STREAM_ID.text(),
streamId);
Promise<FullHttpResponse> promise = CHANNEL.eventLoop().newPromise();
- synchronized (RESPONSE_HANDLER) {
- CHANNEL.writeAndFlush(request);
- RESPONSE_HANDLER.put(streamId, promise);
- }
+ RESPONSE_HANDLER.put(streamId, promise);
+ CHANNEL.writeAndFlush(request);
assertEquals(HttpResponseStatus.OK, promise.get().status());
ByteBuf content = promise.get().content();
assertEquals("HTTP/2 DTP", content.toString(StandardCharsets.UTF_8));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Server.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Server.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Server.java
new file mode 100644
index 0000000..5f298f5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/AbstractTestHttp2Server.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import java.net.InetSocketAddress;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import org.eclipse.jetty.http2.ErrorCode;
+import org.eclipse.jetty.http2.api.Session;
+import org.eclipse.jetty.http2.client.HTTP2Client;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.FuturePromise;
+
+public abstract class AbstractTestHttp2Server {
+
+ protected EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+
+ protected EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+ protected Channel server;
+
+ protected HTTP2Client client = new HTTP2Client();
+
+ protected Session session;
+
+ protected abstract Channel initServer();
+
+ protected final void start() throws Exception {
+ server = initServer();
+ client.start();
+ int port = ((InetSocketAddress) server.localAddress()).getPort();
+ FuturePromise<Session> sessionPromise = new FuturePromise<>();
+ client.connect(new InetSocketAddress("127.0.0.1", port),
+ new Session.Listener.Adapter(), sessionPromise);
+ session = sessionPromise.get();
+ }
+
+ protected final void stop() throws Exception {
+ if (session != null) {
+ session.close(ErrorCode.NO_ERROR.code, "", new Callback.Adapter());
+ }
+ if (server != null) {
+ server.close();
+ }
+ client.stop();
+ bossGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/StreamListener.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/StreamListener.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/StreamListener.java
new file mode 100644
index 0000000..7194490
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/StreamListener.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.eclipse.jetty.http.MetaData;
+import org.eclipse.jetty.http.MetaData.Response;
+import org.eclipse.jetty.http2.api.Stream;
+import org.eclipse.jetty.http2.frames.DataFrame;
+import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.http2.frames.ResetFrame;
+import org.eclipse.jetty.util.Callback;
+
+public class StreamListener extends Stream.Listener.Adapter {
+
+ private boolean finish = false;
+
+ private byte[] buf = new byte[0];
+
+ private int status = -1;
+
+ private boolean reset;
+
+ @Override
+ public void onData(Stream stream, DataFrame frame, Callback callback) {
+ synchronized (this) {
+ if (reset) {
+ callback.failed(new IllegalStateException("Stream already closed"));
+ }
+ if (status == -1) {
+ callback
+ .failed(new IllegalStateException("Haven't received header yet"));
+ }
+ int bufLen = buf.length;
+ int newBufLen = bufLen + frame.getData().remaining();
+ buf = Arrays.copyOf(buf, newBufLen);
+ frame.getData().get(buf, bufLen, frame.getData().remaining());
+ if (frame.isEndStream()) {
+ finish = true;
+ }
+ notifyAll();
+ callback.succeeded();
+ }
+ }
+
+ @Override
+ public void onHeaders(Stream stream, HeadersFrame frame) {
+ synchronized (this) {
+ if (reset) {
+ throw new IllegalStateException("Stream already closed");
+ }
+ if (status != -1) {
+ throw new IllegalStateException("Header already received");
+ }
+ MetaData meta = frame.getMetaData();
+ if (!meta.isResponse()) {
+ throw new IllegalStateException("Received non-response header");
+ }
+ status = ((Response) meta).getStatus();
+ if (frame.isEndStream()) {
+ finish = true;
+ notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public void onReset(Stream stream, ResetFrame frame) {
+ synchronized (this) {
+ reset = true;
+ finish = true;
+ notifyAll();
+ }
+ }
+
+ public int getStatus() throws InterruptedException, IOException {
+ synchronized (this) {
+ while (!finish) {
+ wait();
+ }
+ if (reset) {
+ throw new IOException("Stream reset");
+ }
+ return status;
+ }
+ }
+
+ public byte[] getData() throws InterruptedException, IOException {
+ synchronized (this) {
+ while (!finish) {
+ wait();
+ }
+ if (reset) {
+ throw new IOException("Stream reset");
+ }
+ return buf;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java
new file mode 100644
index 0000000..6a8495b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2Server.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import static org.junit.Assert.assertEquals;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.util.ReferenceCountUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.MetaData;
+import org.eclipse.jetty.http2.ErrorCode;
+import org.eclipse.jetty.http2.api.Stream;
+import org.eclipse.jetty.http2.frames.DataFrame;
+import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.http2.frames.PriorityFrame;
+import org.eclipse.jetty.http2.frames.ResetFrame;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.FuturePromise;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestHttp2Server extends AbstractTestHttp2Server {
+
+ private final AtomicInteger handlerClosedCount = new AtomicInteger(0);
+
+ private final class HelloWorldHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg)
+ throws Exception {
+ if (msg instanceof Http2Headers) {
+ ctx.writeAndFlush(new DefaultHttp2Headers()
+ .status(HttpResponseStatus.OK.codeAsText()));
+ } else {
+ ctx.writeAndFlush(ReferenceCountUtil.retain(msg));
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ handlerClosedCount.incrementAndGet();
+ }
+
+ }
+
+ @Override
+ protected Channel initServer() {
+ return new ServerBootstrap().group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new ChannelInitializer<Channel>() {
+
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ ch.pipeline().addLast(
+ ServerHttp2ConnectionHandler.create(ch,
+ new ChannelInitializer<Http2StreamChannel>() {
+
+ @Override
+ protected void initChannel(Http2StreamChannel ch)
+ throws Exception {
+ ch.pipeline().addLast(new HelloWorldHandler());
+ }
+ }));
+ }
+
+ }).bind(0).syncUninterruptibly().channel();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ stop();
+ }
+
+ @Test
+ public void test() throws InterruptedException, ExecutionException,
+ IOException {
+ HttpFields fields = new HttpFields();
+ fields.put(HttpHeader.C_METHOD, HttpMethod.GET.asString());
+ fields.put(HttpHeader.C_PATH, "/");
+ FuturePromise<Stream> streamPromise = new FuturePromise<>();
+ StreamListener listener = new StreamListener();
+ session.newStream(new HeadersFrame(1, new MetaData(
+ org.eclipse.jetty.http.HttpVersion.HTTP_2, fields), new PriorityFrame(
+ 1, 0, 1, false), false), streamPromise, listener);
+ Stream stream = streamPromise.get();
+ stream.data(
+ new DataFrame(stream.getId(), ByteBuffer.wrap("Hello World"
+ .getBytes(StandardCharsets.UTF_8)), true), new Callback.Adapter());
+ assertEquals("Hello World", new String(listener.getData(),
+ StandardCharsets.UTF_8));
+
+ streamPromise = new FuturePromise<>();
+ listener = new StreamListener();
+ session.newStream(new HeadersFrame(1, new MetaData(
+ org.eclipse.jetty.http.HttpVersion.HTTP_2, fields), new PriorityFrame(
+ 1, 0, 1, false), false), streamPromise, listener);
+ stream = streamPromise.get();
+ stream.reset(new ResetFrame(stream.getId(), ErrorCode.NO_ERROR.code),
+ new Callback.Adapter());
+ Thread.sleep(1000);
+ assertEquals(2, handlerClosedCount.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java
new file mode 100644
index 0000000..e583ca3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/http2/TestHttp2ServerMultiThread.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.http2;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.util.ResourceLeakDetector;
+import io.netty.util.ResourceLeakDetector.Level;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.eclipse.jetty.http.HttpFields;
+import org.eclipse.jetty.http.HttpHeader;
+import org.eclipse.jetty.http.HttpMethod;
+import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.http.MetaData;
+import org.eclipse.jetty.http2.ErrorCode;
+import org.eclipse.jetty.http2.api.Stream;
+import org.eclipse.jetty.http2.frames.DataFrame;
+import org.eclipse.jetty.http2.frames.HeadersFrame;
+import org.eclipse.jetty.http2.frames.PriorityFrame;
+import org.eclipse.jetty.http2.frames.ResetFrame;
+import org.eclipse.jetty.util.Callback;
+import org.eclipse.jetty.util.FuturePromise;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class TestHttp2ServerMultiThread extends AbstractTestHttp2Server {
+
+ private final class DispatchHandler extends
+ SimpleChannelInboundHandler<Http2Headers> {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Http2Headers msg)
+ throws Exception {
+ ctx.writeAndFlush(new DefaultHttp2Headers().status(HttpResponseStatus.OK
+ .codeAsText()));
+ ctx.pipeline().remove(this)
+ .addLast(new EchoHandler(), new EndStreamHandler());
+ }
+ }
+
+ private final class EchoHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
+ throws Exception {
+ ByteBuf out = msg.readBytes(msg.readableBytes());
+ ctx.writeAndFlush(out);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ handlerClosedCount.incrementAndGet();
+ }
+
+ }
+
+ private final class EndStreamHandler extends
+ SimpleChannelInboundHandler<LastHttp2Message> {
+
+ @Override
+ protected void
+ channelRead0(ChannelHandlerContext ctx, LastHttp2Message msg)
+ throws Exception {
+ ctx.writeAndFlush(msg);
+ }
+
+ }
+
+ private final AtomicInteger handlerClosedCount = new AtomicInteger(0);
+
+ private int concurrency = 10;
+
+ private ExecutorService executor = Executors.newFixedThreadPool(concurrency,
+ new ThreadFactoryBuilder().setNameFormat("Echo-Client-%d").setDaemon(true)
+ .build());
+
+ private int requestCount = 10000;
+
+ @Override
+ protected Channel initServer() {
+ return new ServerBootstrap().group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new ChannelInitializer<Channel>() {
+
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ ch.pipeline().addLast(
+ ServerHttp2ConnectionHandler.create(ch,
+ new ChannelInitializer<Http2StreamChannel>() {
+
+ @Override
+ protected void initChannel(Http2StreamChannel ch)
+ throws Exception {
+ ch.pipeline().addLast(new DispatchHandler());
+ }
+ }));
+ }
+
+ }).bind(0).syncUninterruptibly().channel();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ ResourceLeakDetector.setLevel(Level.ADVANCED);
+ start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ executor.shutdownNow();
+ stop();
+ }
+
+ private void testEcho() throws InterruptedException, ExecutionException,
+ IOException {
+ HttpFields fields = new HttpFields();
+ fields.put(HttpHeader.C_METHOD, HttpMethod.GET.asString());
+ fields.put(HttpHeader.C_PATH, "/");
+ FuturePromise<Stream> streamPromise = new FuturePromise<>();
+ StreamListener listener = new StreamListener();
+ session.newStream(new HeadersFrame(1, new MetaData(
+ org.eclipse.jetty.http.HttpVersion.HTTP_2, fields), new PriorityFrame(
+ 1, 0, 1, false), false), streamPromise, listener);
+ Stream stream = streamPromise.get();
+ if (ThreadLocalRandom.current().nextInt(5) < 1) { // 20%
+ stream.reset(new ResetFrame(stream.getId(), ErrorCode.NO_ERROR.code),
+ new Callback.Adapter());
+ } else {
+ int numFrames = ThreadLocalRandom.current().nextInt(1, 3);
+ ByteArrayOutputStream msg = new ByteArrayOutputStream();
+ for (int i = 0; i < numFrames; i++) {
+ byte[] frame = new byte[ThreadLocalRandom.current().nextInt(10, 100)];
+ ThreadLocalRandom.current().nextBytes(frame);
+ stream.data(new DataFrame(stream.getId(), ByteBuffer.wrap(frame),
+ i == numFrames - 1), new Callback.Adapter());
+ msg.write(frame);
+ }
+ assertEquals(HttpStatus.OK_200, listener.getStatus());
+ assertArrayEquals(msg.toByteArray(), listener.getData());
+ }
+ }
+
+ @Test
+ public void test() throws InterruptedException {
+ final AtomicBoolean succ = new AtomicBoolean(true);
+ for (int i = 0; i < requestCount; i++) {
+ executor.execute(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ testEcho();
+ } catch (Throwable t) {
+ t.printStackTrace();
+ succ.set(false);
+ }
+ }
+ });
+ }
+ executor.shutdown();
+ assertTrue(executor.awaitTermination(5, TimeUnit.MINUTES));
+ assertTrue(succ.get());
+ Thread.sleep(1000);
+ assertEquals(requestCount, handlerClosedCount.get());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d7e4ad4a/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 636e063..f7e613c 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -588,6 +588,13 @@
</dependency>
<dependency>
+ <groupId>org.eclipse.jetty.http2</groupId>
+ <artifactId>http2-client</artifactId>
+ <version>9.3.0.M2</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.twitter</groupId>
<artifactId>hpack</artifactId>
<version>0.11.0</version>