You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2018/12/28 02:56:50 UTC

[GitHub] dongeforever closed pull request #638: [ISSUE#525]add aclClient PRCHook for message track

dongeforever closed pull request #638: [ISSUE#525]add aclClient PRCHook for message track
URL: https://github.com/apache/rocketmq/pull/638
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/README.md b/README.md
index a5f47e597..a6493b6f5 100644
--- a/README.md
+++ b/README.md
@@ -37,8 +37,7 @@ It offers a variety of features:
 ----------
 
 ## Apache RocketMQ Community
-* [RocketMQ Community Incubator Projects](https://github.com/apache/rocketmq-externals)
-
+[RocketMQ Community Incubator Projects](https://github.com/apache/rocketmq-externals)
 ----------
 
 ## Contributing
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java
index 4169d88fe..eec626357 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/AclSignerTest.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.acl.common;
 
 import org.junit.Test;
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/PermissionTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/PermissionTest.java
index 31820ad7d..253b5b241 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/common/PermissionTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/PermissionTest.java
@@ -153,4 +153,16 @@ public void checkAdminCodeTest() {
             }
         }
     }
+
+    @Test
+    public void AclExceptionTest(){
+        AclException aclException = new AclException("CAL_SIGNATURE_FAILED",10015);
+        AclException aclExceptionWithMessage = new AclException("CAL_SIGNATURE_FAILED",10015,"CAL_SIGNATURE_FAILED Exception");
+        Assert.assertEquals(aclException.getCode(),10015);
+        Assert.assertEquals(aclExceptionWithMessage.getStatus(),"CAL_SIGNATURE_FAILED");
+        aclException.setCode(10016);
+        Assert.assertEquals(aclException.getCode(),10016);
+        aclException.setStatus("netaddress examine scope Exception netaddress");
+        Assert.assertEquals(aclException.getStatus(),"netaddress examine scope Exception netaddress");
+    }
 }
diff --git a/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java b/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java
index b6f9b8ce0..a1a4bde4f 100644
--- a/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java
+++ b/acl/src/test/java/org/apache/rocketmq/acl/common/SessionCredentialsTest.java
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.rocketmq.acl.common;
 
 import org.junit.Assert;
@@ -25,5 +41,51 @@ public void updateContentTest(){
         sessionCredentials.updateContent(properties);
     }
 
+    @Test
+    public void SessionCredentialHashCodeTest(){
+        SessionCredentials sessionCredentials=new SessionCredentials();
+        Properties properties=new Properties();
+        properties.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
+        properties.setProperty(SessionCredentials.SECRET_KEY,"12345678");
+        properties.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
+        sessionCredentials.updateContent(properties);
+        Assert.assertEquals(sessionCredentials.hashCode(),353652211);
+    }
+
+    @Test
+    public void SessionCredentialEqualsTest(){
+        SessionCredentials sessionCredential1 =new SessionCredentials();
+        Properties properties1=new Properties();
+        properties1.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
+        properties1.setProperty(SessionCredentials.SECRET_KEY,"12345678");
+        properties1.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
+        sessionCredential1.updateContent(properties1);
+
+        SessionCredentials sessionCredential2 =new SessionCredentials();
+        Properties properties2=new Properties();
+        properties2.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
+        properties2.setProperty(SessionCredentials.SECRET_KEY,"12345678");
+        properties2.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
+        sessionCredential2.updateContent(properties2);
+
+        Assert.assertTrue(sessionCredential2.equals(sessionCredential1));
+        sessionCredential2.setSecretKey("1234567899");
+        sessionCredential2.setSignature("1234567899");
+        Assert.assertFalse(sessionCredential2.equals(sessionCredential1));
+    }
+
+    @Test
+    public void SessionCredentialToStringTest(){
+        SessionCredentials sessionCredential1 =new SessionCredentials();
+        Properties properties1=new Properties();
+        properties1.setProperty(SessionCredentials.ACCESS_KEY,"RocketMQ");
+        properties1.setProperty(SessionCredentials.SECRET_KEY,"12345678");
+        properties1.setProperty(SessionCredentials.SECURITY_TOKEN,"abcd");
+        sessionCredential1.updateContent(properties1);
+
+        Assert.assertEquals(sessionCredential1.toString(),
+            "SessionCredentials [accessKey=RocketMQ, secretKey=12345678, signature=null, SecurityToken=abcd]");
+    }
+
 
 }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransferTest.java
new file mode 100644
index 000000000..508635c04
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/pagecache/ManyMessageTransferTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.pagecache;
+
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ManyMessageTransferTest {
+
+    @Test
+    public void ManyMessageTransferBuilderTest(){
+        ByteBuffer byteBuffer = ByteBuffer.allocate(20);
+        byteBuffer.putInt(20);
+        GetMessageResult getMessageResult = new GetMessageResult();
+        ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
+    }
+
+    @Test
+    public void ManyMessageTransferPosTest(){
+        ByteBuffer byteBuffer = ByteBuffer.allocate(20);
+        byteBuffer.putInt(20);
+        GetMessageResult getMessageResult = new GetMessageResult();
+        ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
+        Assert.assertEquals(manyMessageTransfer.position(),4);
+    }
+
+    @Test
+    public void ManyMessageTransferCountTest(){
+        ByteBuffer byteBuffer = ByteBuffer.allocate(20);
+        byteBuffer.putInt(20);
+        GetMessageResult getMessageResult = new GetMessageResult();
+        ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
+
+        Assert.assertEquals(manyMessageTransfer.count(),20);
+
+    }
+
+    @Test
+    public void ManyMessageTransferCloseTest(){
+        ByteBuffer byteBuffer = ByteBuffer.allocate(20);
+        byteBuffer.putInt(20);
+        GetMessageResult getMessageResult = new GetMessageResult();
+        ManyMessageTransfer manyMessageTransfer = new ManyMessageTransfer(byteBuffer,getMessageResult);
+        manyMessageTransfer.close();
+        manyMessageTransfer.deallocate();
+    }
+}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/pagecache/OneMessageTransferTest.java b/broker/src/test/java/org/apache/rocketmq/broker/pagecache/OneMessageTransferTest.java
new file mode 100644
index 000000000..2cd4bdc16
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/pagecache/OneMessageTransferTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.pagecache;
+
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.store.MappedFile;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OneMessageTransferTest {
+
+    @Test
+    public void OneMessageTransferTest(){
+        ByteBuffer byteBuffer = ByteBuffer.allocate(20);
+        byteBuffer.putInt(20);
+        SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile());
+        OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult);
+    }
+
+    @Test
+    public void OneMessageTransferCountTest(){
+        ByteBuffer byteBuffer = ByteBuffer.allocate(20);
+        byteBuffer.putInt(20);
+        SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile());
+        OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult);
+        Assert.assertEquals(manyMessageTransfer.count(),40);
+    }
+
+    @Test
+    public void OneMessageTransferPosTest(){
+        ByteBuffer byteBuffer = ByteBuffer.allocate(20);
+        byteBuffer.putInt(20);
+        SelectMappedBufferResult selectMappedBufferResult = new SelectMappedBufferResult(0,byteBuffer,20,new MappedFile());
+        OneMessageTransfer manyMessageTransfer = new OneMessageTransfer(byteBuffer,selectMappedBufferResult);
+        Assert.assertEquals(manyMessageTransfer.position(),8);
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 179a80daa..2149d6756 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -309,7 +309,7 @@ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
                 } else {
                     tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
                 }
-                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
+                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook);
                 dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
                 traceDispatcher = dispatcher;
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 3c33d2eed..22c676055 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -178,7 +178,7 @@ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean ms
                 } else {
                     tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC);
                 }
-                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties);
+                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties, rpcHook);
                 dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
                 traceDispatcher = dispatcher;
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
index 90b00d414..496d290f5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java
@@ -49,6 +49,7 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.rocketmq.remoting.RPCHook;
 
 /**
  * Created by zongtanghu on 2018/11/6.
@@ -74,7 +75,7 @@
     private String dispatcherId = UUID.randomUUID().toString();
     private String traceTopicName;
 
-    public AsyncArrayDispatcher(Properties properties) throws MQClientException {
+    public AsyncArrayDispatcher(Properties properties, RPCHook rpcHook) throws MQClientException {
         dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE);
         int queueSize = Integer.parseInt(properties.getProperty(TrackTraceConstants.ASYNC_BUFFER_SIZE, "2048"));
         // queueSize is greater than or equal to the n power of 2 of value
@@ -92,7 +93,7 @@ public AsyncArrayDispatcher(Properties properties) throws MQClientException {
             TimeUnit.MILLISECONDS, //
             this.appenderQueue, //
             new ThreadFactoryImpl("MQTraceSendThread_"));
-        traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties);
+        traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook);
     }
 
     public String getTraceTopicName() {
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java
index 27447df73..37c39a14e 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/TrackTraceProducerFactory.java
@@ -25,6 +25,7 @@
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.remoting.RPCHook;
 
 public class TrackTraceProducerFactory {
 
@@ -33,10 +34,10 @@
     private static DefaultMQProducer traceProducer;
 
 
-    public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) {
+    public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, RPCHook rpcHook) {
         if (traceProducer == null) {
 
-            traceProducer = new DefaultMQProducer();
+            traceProducer = new DefaultMQProducer(rpcHook);
             traceProducer.setProducerGroup(TrackTraceConstants.GROUP_NAME);
             traceProducer.setSendMsgTimeout(5000);
             traceProducer.setInstanceName(properties.getProperty(TrackTraceConstants.INSTANCE_NAME, String.valueOf(System.currentTimeMillis())));
diff --git a/pom.xml b/pom.xml
index c20b04cb3..1e3c4bcc8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -259,6 +259,7 @@
                         <exclude>src/test/resources/certs/*</exclude>
                         <exclude>src/test/**/*.log</exclude>
                         <exclude>src/test/resources/META-INF/service/*</exclude>
+                        <exclude>src/main/resources/META-INF/service/*</exclude>
                         <exclude>*/target/**</exclude>
                         <exclude>*/*.iml</exclude>
                     </excludes>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services