You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by wu...@apache.org on 2018/12/03 06:21:47 UTC
[servicecomb-java-chassis] 01/02: [SCB-968]968 http2 do not support
pump download
This is an automated email from the ASF dual-hosted git repository.
wujimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
commit fde6f4dc8c52360476dcfe0b4845a17abae2a758
Author: heyile <25...@qq.com>
AuthorDate: Thu Nov 22 20:14:59 2018 +0800
[SCB-968]968 http2 do not support pump download
---
.../foundation/vertx/stream/PumpFactoryImpl.java | 40 ++++++++
.../foundation/vertx/stream/PumpImpl.java | 108 +++++++++++++++++++++
.../services/io.vertx.core.spi.PumpFactory | 1 +
.../vertx/stream/TestPumpFactoryImpl.java | 33 +++++++
.../foundation/vertx/stream/TestPumpImpl.java | 50 ++++++++++
.../org/apache/servicecomb/it/ConsumerMain.java | 5 +-
.../servicecomb/it/schema/DownloadSchema.java | 1 -
7 files changed, 233 insertions(+), 5 deletions(-)
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFactoryImpl.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFactoryImpl.java
new file mode 100644
index 0000000..c3284e3
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpFactoryImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import java.util.Objects;
+
+import io.vertx.core.spi.PumpFactory;
+import io.vertx.core.streams.Pump;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+
+public class PumpFactoryImpl implements PumpFactory {
+ @Override
+ public <T> Pump pump(ReadStream<T> rs, WriteStream<T> ws) {
+ Objects.requireNonNull(rs);
+ Objects.requireNonNull(ws);
+ return new PumpImpl<>(rs, ws);
+ }
+
+ @Override
+ public <T> Pump pump(ReadStream<T> rs, WriteStream<T> ws, int writeQueueMaxSize) {
+ Objects.requireNonNull(rs);
+ Objects.requireNonNull(ws);
+ return new PumpImpl<>(rs, ws, writeQueueMaxSize);
+ }
+}
\ No newline at end of file
diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpImpl.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpImpl.java
new file mode 100644
index 0000000..3657e73
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpImpl.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.Pump;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+
+public class PumpImpl<T> implements Pump {
+
+ private final ReadStream<T> readStream;
+
+ private final WriteStream<T> writeStream;
+
+ private final Handler<T> dataHandler;
+
+ private final Handler<Void> drainHandler;
+
+ private int pumped;
+
+ public PumpImpl(ReadStream<T> readStream, WriteStream<T> writeStream, int maxWriteQueueSize) {
+ this(readStream, writeStream);
+ this.writeStream.setWriteQueueMaxSize(maxWriteQueueSize);
+ }
+
+ public PumpImpl(ReadStream<T> readStream, WriteStream<T> writeStream) {
+ this.readStream = readStream;
+ this.writeStream = writeStream;
+ drainHandler = v -> readStream.resume();
+ dataHandler = data -> {
+ if (data instanceof Buffer) {
+ if (((Buffer) data).length() == 0) {
+ return;
+ }
+ }
+ writeStream.write(data);
+ incPumped();
+ if (writeStream.writeQueueFull()) {
+ readStream.pause();
+ writeStream.drainHandler(drainHandler);
+ }
+ };
+ }
+
+
+ /**
+ * Set the write queue max size to {@code maxSize}
+ */
+ @Override
+ public PumpImpl<T> setWriteQueueMaxSize(int maxSize) {
+ writeStream.setWriteQueueMaxSize(maxSize);
+ return this;
+ }
+
+ /**
+ * Start the Pump. The Pump can be started and stopped multiple times.
+ */
+ @Override
+ public PumpImpl<T> start() {
+ readStream.handler(dataHandler);
+ return this;
+ }
+
+ /**
+ * Stop the Pump. The Pump can be started and stopped multiple times.
+ */
+ @Override
+ public PumpImpl<T> stop() {
+ writeStream.drainHandler(null);
+ readStream.handler(null);
+ return this;
+ }
+
+ /**
+ * Return the total number of elements pumped by this pump.
+ */
+ @Override
+ public synchronized int numberPumped() {
+ return pumped;
+ }
+
+ // Note we synchronize as numberPumped can be called from a different thread however incPumped will always
+ // be called from the same thread so we benefit from bias locked optimisation which should give a very low
+ // overhead
+ private synchronized void incPumped() {
+ pumped++;
+ }
+
+ public Handler<T> getDataHandler() {
+ return dataHandler;
+ }
+}
diff --git a/foundations/foundation-vertx/src/main/resources/META-INF/services/io.vertx.core.spi.PumpFactory b/foundations/foundation-vertx/src/main/resources/META-INF/services/io.vertx.core.spi.PumpFactory
new file mode 100644
index 0000000..1d25eef
--- /dev/null
+++ b/foundations/foundation-vertx/src/main/resources/META-INF/services/io.vertx.core.spi.PumpFactory
@@ -0,0 +1 @@
+org.apache.servicecomb.foundation.vertx.stream.PumpFactoryImpl
\ No newline at end of file
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFactoryImpl.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFactoryImpl.java
new file mode 100644
index 0000000..f86f46d
--- /dev/null
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpFactoryImpl.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.vertx.core.streams.Pump;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+import mockit.Mocked;
+
+public class TestPumpFactoryImpl {
+ @Test
+ public void pump(@Mocked ReadStream<Object> rs, @Mocked WriteStream<Object> ws) {
+ Pump pump = Pump.pump(rs, ws);
+ Assert.assertTrue(pump instanceof PumpImpl);
+ }
+}
diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpImpl.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpImpl.java
new file mode 100644
index 0000000..a2ccb17
--- /dev/null
+++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/stream/TestPumpImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicecomb.foundation.vertx.stream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.vertx.core.Handler;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.streams.ReadStream;
+import io.vertx.core.streams.WriteStream;
+import mockit.Expectations;
+import mockit.Mocked;
+
+public class TestPumpImpl {
+
+ @Test
+ public void testPumpWithPending(@Mocked ReadStream<Object> rs, @Mocked WriteStream<Object> ws, @Mocked Buffer zeroBuf,
+ @Mocked Buffer contentBuf) {
+ PumpImpl<Object> pump = new PumpImpl<>(rs, ws);
+ Handler<Object> handler = pump.getDataHandler();
+ new Expectations() {
+ {
+ zeroBuf.length();
+ result = 0;
+ contentBuf.length();
+ result = 1;
+ }
+ };
+ handler.handle(zeroBuf);
+ handler.handle(contentBuf);
+ Assert.assertEquals(1, pump.numberPumped());
+ handler.handle(contentBuf);
+ Assert.assertEquals(2, pump.numberPumped());
+ }
+}
diff --git a/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java
index 2209583..0ef5039 100644
--- a/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java
+++ b/integration-tests/it-consumer/src/main/java/org/apache/servicecomb/it/ConsumerMain.java
@@ -103,10 +103,7 @@ public class ConsumerMain {
// only rest support default value feature
ITJUnitUtils.runWithRest(TestDefaultValue.class);
- // currently have bug with http2
- if (!ITJUnitUtils.getProducerName().endsWith("-h2") && !ITJUnitUtils.getProducerName().endsWith("-h2c")) {
- ITJUnitUtils.runWithRest(TestDownload.class);
- }
+ ITJUnitUtils.runWithRest(TestDownload.class);
ITJUnitUtils.runWithHighwayAndRest(TestTrace.class);
ITJUnitUtils.run(TestTraceEdge.class);
diff --git a/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java b/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java
index b1123b6..5ba7d8b 100644
--- a/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java
+++ b/integration-tests/it-producer/src/main/java/org/apache/servicecomb/it/schema/DownloadSchema.java
@@ -197,7 +197,6 @@ public class DownloadSchema implements BootListener {
.header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_PLAIN_VALUE)
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment;filename=netInputStream.txt")
.body(conn.getInputStream());
- conn.disconnect();
return responseEntity;
}