You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by yu...@apache.org on 2016/12/28 02:44:08 UTC
[12/58] [abbrv] [partial] incubator-rocketmq git commit: ROCKETMQ-18
Rename package name from com.alibaba to org.apache
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
new file mode 100644
index 0000000..c9303b7
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/ConfigManager.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public abstract class ConfigManager {
+ private static final Logger PLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+
+
+ public abstract String encode();
+
+ public boolean load() {
+ String fileName = null;
+ try {
+ fileName = this.configFilePath();
+ String jsonString = MixAll.file2String(fileName);
+
+ if (null == jsonString || jsonString.length() == 0) {
+ return this.loadBak();
+ } else {
+ this.decode(jsonString);
+ PLOG.info("load {} OK", fileName);
+ return true;
+ }
+ } catch (Exception e) {
+ PLOG.error("load " + fileName + " Failed, and try to load backup file", e);
+ return this.loadBak();
+ }
+ }
+
+ public abstract String configFilePath();
+
+ private boolean loadBak() {
+ String fileName = null;
+ try {
+ fileName = this.configFilePath();
+ String jsonString = MixAll.file2String(fileName + ".bak");
+ if (jsonString != null && jsonString.length() > 0) {
+ this.decode(jsonString);
+ PLOG.info("load " + fileName + " OK");
+ return true;
+ }
+ } catch (Exception e) {
+ PLOG.error("load " + fileName + " Failed", e);
+ return false;
+ }
+
+ return true;
+ }
+
+ public abstract void decode(final String jsonString);
+
+ public synchronized void persist() {
+ String jsonString = this.encode(true);
+ if (jsonString != null) {
+ String fileName = this.configFilePath();
+ try {
+ MixAll.string2File(jsonString, fileName);
+ } catch (IOException e) {
+ PLOG.error("persist file Exception, " + fileName, e);
+ }
+ }
+ }
+
+ public abstract String encode(final boolean prettyFormat);
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/Configuration.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/Configuration.java b/common/src/main/java/org/apache/rocketmq/common/Configuration.java
new file mode 100644
index 0000000..0ab7c0d
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/Configuration.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * @author xigu.lx
+ */
+public class Configuration {
+
+ private final Logger log;
+
+ private List<Object> configObjectList = new ArrayList<Object>(4);
+ private String storePath;
+ private boolean storePathFromConfig = false;
+ private Object storePathObject;
+ private Field storePathField;
+ private DataVersion dataVersion = new DataVersion();
+ private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ /**
+ * All properties include configs in object and extend properties.
+ */
+ private Properties allConfigs = new Properties();
+
+ public Configuration(Logger log) {
+ this.log = log;
+ }
+
+ public Configuration(Logger log, Object... configObjects) {
+ this.log = log;
+ if (configObjects == null || configObjects.length == 0) {
+ return;
+ }
+ for (Object configObject : configObjects) {
+ registerConfig(configObject);
+ }
+ }
+
+ public Configuration(Logger log, String storePath, Object... configObjects) {
+ this(log, configObjects);
+ this.storePath = storePath;
+ }
+
+ /**
+ * register config object
+ *
+ * @param configObject
+ * @return the current Configuration object
+ */
+ public Configuration registerConfig(Object configObject) {
+ try {
+ readWriteLock.writeLock().lockInterruptibly();
+
+ try {
+
+ Properties registerProps = MixAll.object2Properties(configObject);
+
+ merge(registerProps, this.allConfigs);
+
+ configObjectList.add(configObject);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("registerConfig lock error");
+ }
+ return this;
+ }
+
+ /**
+ * register config properties
+ *
+ * @param extProperties
+ * @return the current Configuration object
+ */
+ public Configuration registerConfig(Properties extProperties) {
+ if (extProperties == null) {
+ return this;
+ }
+
+ try {
+ readWriteLock.writeLock().lockInterruptibly();
+
+ try {
+ merge(extProperties, this.allConfigs);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("register lock error. {}" + extProperties);
+ }
+
+ return this;
+ }
+
+ /**
+ * The store path will be gotten from the field of object.
+ *
+ * @param object
+ * @param fieldName
+ *
+ * @throws java.lang.RuntimeException if the field of object is not exist.
+ */
+ public void setStorePathFromConfig(Object object, String fieldName) {
+ assert object != null;
+
+ try {
+ readWriteLock.writeLock().lockInterruptibly();
+
+ try {
+ this.storePathFromConfig = true;
+ this.storePathObject = object;
+ // check
+ this.storePathField = object.getClass().getDeclaredField(fieldName);
+ assert this.storePathField != null
+ && !Modifier.isStatic(this.storePathField.getModifiers());
+ this.storePathField.setAccessible(true);
+ } catch (NoSuchFieldException e) {
+ throw new RuntimeException(e);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("setStorePathFromConfig lock error");
+ }
+ }
+
+ private String getStorePath() {
+ String realStorePath = null;
+ try {
+ readWriteLock.readLock().lockInterruptibly();
+
+ try {
+ realStorePath = this.storePath;
+
+ if (this.storePathFromConfig) {
+ try {
+ realStorePath = (String) storePathField.get(this.storePathObject);
+ } catch (IllegalAccessException e) {
+ log.error("getStorePath error, ", e);
+ }
+ }
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("getStorePath lock error");
+ }
+
+ return realStorePath;
+ }
+
+ public void update(Properties properties) {
+ try {
+ readWriteLock.writeLock().lockInterruptibly();
+
+ try {
+ // the property must be exist when update
+ mergeIfExist(properties, this.allConfigs);
+
+ for (Object configObject : configObjectList) {
+ // not allConfigs to update...
+ MixAll.properties2Object(properties, configObject);
+ }
+
+ this.dataVersion.nextVersion();
+
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("update lock error, {}", properties);
+ return;
+ }
+
+ persist();
+ }
+
+ public void persist() {
+ try {
+ readWriteLock.readLock().lockInterruptibly();
+
+ try {
+ String allConfigs = getAllConfigsInternal();
+
+ MixAll.string2File(allConfigs, getStorePath());
+ } catch (IOException e) {
+ log.error("persist string2File error, ", e);
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("persist lock error");
+ }
+ }
+
+ public String getAllConfigsFormatString() {
+ try {
+ readWriteLock.readLock().lockInterruptibly();
+
+ try {
+
+ return getAllConfigsInternal();
+
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("getAllConfigsFormatString lock error");
+ }
+
+ return null;
+ }
+
+ public String getDataVersionJson() {
+ return this.dataVersion.toJson();
+ }
+
+ public Properties getAllConfigs() {
+ try {
+ readWriteLock.readLock().lockInterruptibly();
+
+ try {
+
+ return this.allConfigs;
+
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ } catch (InterruptedException e) {
+ log.error("getAllConfigs lock error");
+ }
+
+ return null;
+ }
+
+ private String getAllConfigsInternal() {
+ StringBuilder stringBuilder = new StringBuilder();
+
+ // reload from config object ?
+ for (Object configObject : this.configObjectList) {
+ Properties properties = MixAll.object2Properties(configObject);
+ if (properties != null) {
+ merge(properties, this.allConfigs);
+ } else {
+ log.warn("getAllConfigsInternal object2Properties is null, {}", configObject.getClass());
+ }
+ }
+
+ {
+ stringBuilder.append(MixAll.properties2String(this.allConfigs));
+ }
+
+ return stringBuilder.toString();
+ }
+
+ public void setStorePath(final String storePath) {
+ this.storePath = storePath;
+ }
+
+ private void merge(Properties from, Properties to) {
+ for (Object key : from.keySet()) {
+ Object fromObj = from.get(key), toObj = to.get(key);
+ if (toObj != null && !toObj.equals(fromObj)) {
+ log.info("Replace, key: {}, value: {} -> {}", key, toObj, fromObj);
+ }
+ to.put(key, fromObj);
+ }
+ }
+
+ private void mergeIfExist(Properties from, Properties to) {
+ for (Object key : from.keySet()) {
+ if (!to.containsKey(key)) {
+ continue;
+ }
+
+ Object fromObj = from.get(key), toObj = to.get(key);
+ if (toObj != null && !toObj.equals(fromObj)) {
+ log.info("Replace, key: {}, value: {} -> {}", key, toObj, fromObj);
+ }
+ to.put(key, fromObj);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/CountDownLatch.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/CountDownLatch.java b/common/src/main/java/org/apache/rocketmq/common/CountDownLatch.java
new file mode 100644
index 0000000..971c0c7
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/CountDownLatch.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ * Add reset feature for @see java.util.concurrent.CountDownLatch
+ *
+ * @author xinyuzhou.zxy
+ */
+public class CountDownLatch {
+ /**
+ * Synchronization control For CountDownLatch.
+ * Uses AQS state to represent count.
+ */
+ private static final class Sync extends AbstractQueuedSynchronizer {
+ private static final long serialVersionUID = 4982264981922014374L;
+
+ private final int startCount;
+
+ Sync(int count) {
+ this.startCount = count;
+ setState(count);
+ }
+
+ int getCount() {
+ return getState();
+ }
+
+ protected int tryAcquireShared(int acquires) {
+ return (getState() == 0) ? 1 : -1;
+ }
+
+ protected boolean tryReleaseShared(int releases) {
+ // Decrement count; signal when transition to zero
+ for (;;) {
+ int c = getState();
+ if (c == 0)
+ return false;
+ int nextc = c - 1;
+ if (compareAndSetState(c, nextc))
+ return nextc == 0;
+ }
+ }
+
+ protected void reset() {
+ setState(startCount);
+ }
+ }
+
+ private final Sync sync;
+
+ /**
+ * Constructs a {@code CountDownLatch} initialized with the given count.
+ *
+ * @param count
+ * the number of times {@link #countDown} must be invoked
+ * before threads can pass through {@link #await}
+ *
+ * @throws IllegalArgumentException
+ * if {@code count} is negative
+ */
+ public CountDownLatch(int count) {
+ if (count < 0) throw new IllegalArgumentException("count < 0");
+ this.sync = new Sync(count);
+ }
+
+ /**
+ * Causes the current thread to wait until the latch has counted down to
+ * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
+ *
+ * <p>If the current count is zero then this method returns immediately.
+ *
+ * <p>If the current count is greater than zero then the current
+ * thread becomes disabled for thread scheduling purposes and lies
+ * dormant until one of two things happen:
+ * <ul>
+ * <li>The count reaches zero due to invocations of the
+ * {@link #countDown} method; or
+ * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+ * the current thread.
+ * </ul>
+ *
+ * <p>If the current thread:
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
+ * </ul>
+ * then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ * @throws InterruptedException
+ * if the current thread is interrupted
+ * while waiting
+ */
+ public void await() throws InterruptedException {
+ sync.acquireSharedInterruptibly(1);
+ }
+
+ /**
+ * Causes the current thread to wait until the latch has counted down to
+ * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
+ * or the specified waiting time elapses.
+ *
+ * <p>If the current count is zero then this method returns immediately
+ * with the value {@code true}.
+ *
+ * <p>If the current count is greater than zero then the current
+ * thread becomes disabled for thread scheduling purposes and lies
+ * dormant until one of three things happen:
+ * <ul>
+ * <li>The count reaches zero due to invocations of the
+ * {@link #countDown} method; or
+ * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+ * the current thread; or
+ * <li>The specified waiting time elapses.
+ * </ul>
+ *
+ * <p>If the count reaches zero then the method returns with the
+ * value {@code true}.
+ *
+ * <p>If the current thread:
+ * <ul>
+ * <li>has its interrupted status set on entry to this method; or
+ * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
+ * </ul>
+ * then {@link InterruptedException} is thrown and the current thread's
+ * interrupted status is cleared.
+ *
+ * <p>If the specified waiting time elapses then the value {@code false}
+ * is returned. If the time is less than or equal to zero, the method
+ * will not wait at all.
+ *
+ * @param timeout
+ * the maximum time to wait
+ * @param unit
+ * the time unit of the {@code timeout} argument
+ *
+ * @return {@code true} if the count reached zero and {@code false}
+ * if the waiting time elapsed before the count reached zero
+ *
+ * @throws InterruptedException
+ * if the current thread is interrupted
+ * while waiting
+ */
+ public boolean await(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+ }
+
+ /**
+ * Decrements the count of the latch, releasing all waiting threads if
+ * the count reaches zero.
+ *
+ * <p>If the current count is greater than zero then it is decremented.
+ * If the new count is zero then all waiting threads are re-enabled for
+ * thread scheduling purposes.
+ *
+ * <p>If the current count equals zero then nothing happens.
+ */
+ public void countDown() {
+ sync.releaseShared(1);
+ }
+
+ /**
+ * Returns the current count.
+ *
+ * <p>This method is typically used for debugging and testing purposes.
+ *
+ * @return the current count
+ */
+ public long getCount() {
+ return sync.getCount();
+ }
+
+ public void reset() {
+ sync.reset();
+ }
+
+ /**
+ * Returns a string identifying this latch, as well as its state.
+ * The state, in brackets, includes the String {@code "Count ="}
+ * followed by the current count.
+ *
+ * @return a string identifying this latch, as well as its state
+ */
+ public String toString() {
+ return super.toString() + "[Count = " + sync.getCount() + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/DataVersion.java b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
new file mode 100644
index 0000000..94fd90b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/DataVersion.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DataVersion extends RemotingSerializable {
+ private long timestatmp = System.currentTimeMillis();
+ private AtomicLong counter = new AtomicLong(0);
+
+
+ public void assignNewOne(final DataVersion dataVersion) {
+ this.timestatmp = dataVersion.timestatmp;
+ this.counter.set(dataVersion.counter.get());
+ }
+
+
+ public void nextVersion() {
+ this.timestatmp = System.currentTimeMillis();
+ this.counter.incrementAndGet();
+ }
+
+
+ public long getTimestatmp() {
+ return timestatmp;
+ }
+
+
+ public void setTimestatmp(long timestatmp) {
+ this.timestatmp = timestatmp;
+ }
+
+
+ public AtomicLong getCounter() {
+ return counter;
+ }
+
+
+ public void setCounter(AtomicLong counter) {
+ this.counter = counter;
+ }
+
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final DataVersion that = (DataVersion) o;
+
+ if (timestatmp != that.timestatmp) return false;
+ return counter != null ? counter.equals(that.counter) : that.counter == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (timestatmp ^ (timestatmp >>> 32));
+ result = 31 * result + (counter != null ? counter.hashCode() : 0);
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/MQVersion.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java
new file mode 100644
index 0000000..f53fc27
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+/**
+ * @author shijia.wxr
+ */
+public class MQVersion {
+
+ public static final int CURRENT_VERSION = Version.V4_0_0_SNAPSHOT.ordinal();
+
+
+ public static String getVersionDesc(int value) {
+ try {
+ Version v = Version.values()[value];
+ return v.name();
+ } catch (Exception e) {
+ }
+
+ return "HigherVersion";
+ }
+
+
+ public static Version value2Version(int value) {
+ return Version.values()[value];
+ }
+
+ public enum Version {
+ V3_0_0_SNAPSHOT,
+ V3_0_0_ALPHA1,
+ V3_0_0_BETA1,
+ V3_0_0_BETA2,
+ V3_0_0_BETA3,
+ V3_0_0_BETA4,
+ V3_0_0_BETA5,
+ V3_0_0_BETA6_SNAPSHOT,
+ V3_0_0_BETA6,
+ V3_0_0_BETA7_SNAPSHOT,
+ V3_0_0_BETA7,
+ V3_0_0_BETA8_SNAPSHOT,
+ V3_0_0_BETA8,
+ V3_0_0_BETA9_SNAPSHOT,
+ V3_0_0_BETA9,
+ V3_0_0_FINAL,
+ V3_0_1_SNAPSHOT,
+ V3_0_1,
+ V3_0_2_SNAPSHOT,
+ V3_0_2,
+ V3_0_3_SNAPSHOT,
+ V3_0_3,
+ V3_0_4_SNAPSHOT,
+ V3_0_4,
+ V3_0_5_SNAPSHOT,
+ V3_0_5,
+ V3_0_6_SNAPSHOT,
+ V3_0_6,
+ V3_0_7_SNAPSHOT,
+ V3_0_7,
+ V3_0_8_SNAPSHOT,
+ V3_0_8,
+ V3_0_9_SNAPSHOT,
+ V3_0_9,
+
+ V3_0_10_SNAPSHOT,
+ V3_0_10,
+
+ V3_0_11_SNAPSHOT,
+ V3_0_11,
+
+ V3_0_12_SNAPSHOT,
+ V3_0_12,
+
+ V3_0_13_SNAPSHOT,
+ V3_0_13,
+
+ V3_0_14_SNAPSHOT,
+ V3_0_14,
+
+ V3_0_15_SNAPSHOT,
+ V3_0_15,
+
+ V3_1_0_SNAPSHOT,
+ V3_1_0,
+
+ V3_1_1_SNAPSHOT,
+ V3_1_1,
+
+ V3_1_2_SNAPSHOT,
+ V3_1_2,
+
+ V3_1_3_SNAPSHOT,
+ V3_1_3,
+
+ V3_1_4_SNAPSHOT,
+ V3_1_4,
+
+ V3_1_5_SNAPSHOT,
+ V3_1_5,
+
+ V3_1_6_SNAPSHOT,
+ V3_1_6,
+
+ V3_1_7_SNAPSHOT,
+ V3_1_7,
+
+ V3_1_8_SNAPSHOT,
+ V3_1_8,
+
+ V3_1_9_SNAPSHOT,
+ V3_1_9,
+
+ V3_2_0_SNAPSHOT,
+ V3_2_0,
+
+ V3_2_1_SNAPSHOT,
+ V3_2_1,
+
+ V3_2_2_SNAPSHOT,
+ V3_2_2,
+
+ V3_2_3_SNAPSHOT,
+ V3_2_3,
+
+ V3_2_4_SNAPSHOT,
+ V3_2_4,
+
+ V3_2_5_SNAPSHOT,
+ V3_2_5,
+
+ V3_2_6_SNAPSHOT,
+ V3_2_6,
+
+ V3_2_7_SNAPSHOT,
+ V3_2_7,
+
+ V3_2_8_SNAPSHOT,
+ V3_2_8,
+
+ V3_2_9_SNAPSHOT,
+ V3_2_9,
+
+ V3_3_1_SNAPSHOT,
+ V3_3_1,
+
+ V3_3_2_SNAPSHOT,
+ V3_3_2,
+
+ V3_3_3_SNAPSHOT,
+ V3_3_3,
+
+ V3_3_4_SNAPSHOT,
+ V3_3_4,
+
+ V3_3_5_SNAPSHOT,
+ V3_3_5,
+
+ V3_3_6_SNAPSHOT,
+ V3_3_6,
+
+ V3_3_7_SNAPSHOT,
+ V3_3_7,
+
+ V3_3_8_SNAPSHOT,
+ V3_3_8,
+
+ V3_3_9_SNAPSHOT,
+ V3_3_9,
+
+ V3_4_1_SNAPSHOT,
+ V3_4_1,
+
+ V3_4_2_SNAPSHOT,
+ V3_4_2,
+
+ V3_4_3_SNAPSHOT,
+ V3_4_3,
+
+ V3_4_4_SNAPSHOT,
+ V3_4_4,
+
+ V3_4_5_SNAPSHOT,
+ V3_4_5,
+
+ V3_4_6_SNAPSHOT,
+ V3_4_6,
+
+ V3_4_7_SNAPSHOT,
+ V3_4_7,
+
+ V3_4_8_SNAPSHOT,
+ V3_4_8,
+
+ V3_4_9_SNAPSHOT,
+ V3_4_9,
+ V3_5_1_SNAPSHOT,
+ V3_5_1,
+
+ V3_5_2_SNAPSHOT,
+ V3_5_2,
+
+ V3_5_3_SNAPSHOT,
+ V3_5_3,
+
+ V3_5_4_SNAPSHOT,
+ V3_5_4,
+
+ V3_5_5_SNAPSHOT,
+ V3_5_5,
+
+ V3_5_6_SNAPSHOT,
+ V3_5_6,
+
+ V3_5_7_SNAPSHOT,
+ V3_5_7,
+
+ V3_5_8_SNAPSHOT,
+ V3_5_8,
+
+ V3_5_9_SNAPSHOT,
+ V3_5_9,
+
+ V3_6_1_SNAPSHOT,
+ V3_6_1,
+
+ V3_6_2_SNAPSHOT,
+ V3_6_2,
+
+ V3_6_3_SNAPSHOT,
+ V3_6_3,
+
+ V3_6_4_SNAPSHOT,
+ V3_6_4,
+
+ V3_6_5_SNAPSHOT,
+ V3_6_5,
+
+ V3_6_6_SNAPSHOT,
+ V3_6_6,
+
+ V3_6_7_SNAPSHOT,
+ V3_6_7,
+
+ V3_6_8_SNAPSHOT,
+ V3_6_8,
+
+ V3_6_9_SNAPSHOT,
+ V3_6_9,
+
+ V3_7_1_SNAPSHOT,
+ V3_7_1,
+
+ V3_7_2_SNAPSHOT,
+ V3_7_2,
+
+ V3_7_3_SNAPSHOT,
+ V3_7_3,
+
+ V3_7_4_SNAPSHOT,
+ V3_7_4,
+
+ V3_7_5_SNAPSHOT,
+ V3_7_5,
+
+ V3_7_6_SNAPSHOT,
+ V3_7_6,
+
+ V3_7_7_SNAPSHOT,
+ V3_7_7,
+
+ V3_7_8_SNAPSHOT,
+ V3_7_8,
+
+ V3_7_9_SNAPSHOT,
+ V3_7_9,
+
+ V3_8_1_SNAPSHOT,
+ V3_8_1,
+
+ V3_8_2_SNAPSHOT,
+ V3_8_2,
+
+ V3_8_3_SNAPSHOT,
+ V3_8_3,
+
+ V3_8_4_SNAPSHOT,
+ V3_8_4,
+
+ V3_8_5_SNAPSHOT,
+ V3_8_5,
+
+ V3_8_6_SNAPSHOT,
+ V3_8_6,
+
+ V3_8_7_SNAPSHOT,
+ V3_8_7,
+
+ V3_8_8_SNAPSHOT,
+ V3_8_8,
+
+ V3_8_9_SNAPSHOT,
+ V3_8_9,
+
+ V3_9_1_SNAPSHOT,
+ V3_9_1,
+
+ V3_9_2_SNAPSHOT,
+ V3_9_2,
+
+ V3_9_3_SNAPSHOT,
+ V3_9_3,
+
+ V3_9_4_SNAPSHOT,
+ V3_9_4,
+
+ V3_9_5_SNAPSHOT,
+ V3_9_5,
+
+ V3_9_6_SNAPSHOT,
+ V3_9_6,
+
+ V3_9_7_SNAPSHOT,
+ V3_9_7,
+
+ V3_9_8_SNAPSHOT,
+ V3_9_8,
+
+ V3_9_9_SNAPSHOT,
+ V3_9_9,
+
+ V4_0_0_SNAPSHOT,
+ V4_0_0,
+
+ V4_1_0_SNAPSHOT,
+ V4_1_0,
+
+ V4_2_0_SNAPSHOT,
+ V4_2_0,
+
+ V4_3_0_SNAPSHOT,
+ V4_3_0,
+
+ V4_4_0_SNAPSHOT,
+ V4_4_0,
+
+ V4_5_0_SNAPSHOT,
+ V4_5_0,
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/MixAll.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
new file mode 100644
index 0000000..12fb65a
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.common.annotation.ImportantField;
+import org.apache.rocketmq.common.help.FAQUrl;
+import org.slf4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MixAll {
+ public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";
+ public static final String ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir";
+ public static final String NAMESRV_ADDR_ENV = "NAMESRV_ADDR";
+ public static final String NAMESRV_ADDR_PROPERTY = "rocketmq.namesrv.addr";
+ public static final String MESSAGE_COMPRESS_LEVEL = "rocketmq.message.compressLevel";
+ public static final String WS_DOMAIN_NAME = System.getProperty("rocketmq.namesrv.domain", "jmenv.tbsite.net");
+ public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
+ // http://jmenv.tbsite.net:8080/rocketmq/nsaddr
+ public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;
+ public static final String DEFAULT_TOPIC = "TBW102";
+ public static final String BENCHMARK_TOPIC = "BenchmarkTest";
+ public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
+ public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";
+ public static final String TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER";
+ public static final String FILTERSRV_CONSUMER_GROUP = "FILTERSRV_CONSUMER";
+ public static final String MONITOR_CONSUMER_GROUP = "__MONITOR_CONSUMER";
+ public static final String CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER";
+ public static final String SELF_TEST_PRODUCER_GROUP = "SELF_TEST_P_GROUP";
+ public static final String SELF_TEST_CONSUMER_GROUP = "SELF_TEST_C_GROUP";
+ public static final String SELF_TEST_TOPIC = "SELF_TEST_TOPIC";
+ public static final String OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT";
+ public static final String ONS_HTTP_PROXY_GROUP = "CID_ONS-HTTP-PROXY";
+ public static final String CID_ONSAPI_PERMISSION_GROUP = "CID_ONSAPI_PERMISSION";
+ public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER";
+ public static final String CID_ONSAPI_PULL_GROUP = "CID_ONSAPI_PULL";
+ public static final String CID_RMQ_SYS_PREFIX = "CID_RMQ_SYS_";
+
+ public static final List<String> LOCAL_INET_ADDRESS = getLocalInetAddress();
+ public static final String LOCALHOST = localhost();
+ public static final String DEFAULT_CHARSET = "UTF-8";
+ public static final long MASTER_ID = 0L;
+ public static final long CURRENT_JVM_PID = getPID();
+
+ public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
+
+ public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
+ public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
+ public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY";
+ public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion";
+ public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
+
+ public static String getRetryTopic(final String consumerGroup) {
+ return RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
+ }
+
+
+ public static boolean isSysConsumerGroup(final String consumerGroup) {
+ return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX);
+ }
+
+ public static boolean isSystemTopic(final String topic) {
+ return topic.startsWith(SYSTEM_TOPIC_PREFIX);
+ }
+
+ public static String getDLQTopic(final String consumerGroup) {
+ return DLQ_GROUP_TOPIC_PREFIX + consumerGroup;
+ }
+
+
+ public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) {
+ if (isChange) {
+ String[] ipAndPort = brokerAddr.split(":");
+ String brokerAddrNew = ipAndPort[0] + ":" + (Integer.parseInt(ipAndPort[1]) - 2);
+ return brokerAddrNew;
+ } else {
+ return brokerAddr;
+ }
+ }
+
+
+ public static long getPID() {
+ String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
+ if (processName != null && processName.length() > 0) {
+ try {
+ return Long.parseLong(processName.split("@")[0]);
+ } catch (Exception e) {
+ return 0;
+ }
+ }
+
+ return 0;
+ }
+
+
+ public static long createBrokerId(final String ip, final int port) {
+ InetSocketAddress isa = new InetSocketAddress(ip, port);
+ byte[] ipArray = isa.getAddress().getAddress();
+ ByteBuffer bb = ByteBuffer.allocate(8);
+ bb.put(ipArray);
+ bb.putInt(port);
+ long value = bb.getLong(0);
+ return Math.abs(value);
+ }
+
+ public static final void string2File(final String str, final String fileName) throws IOException {
+
+ String tmpFile = fileName + ".tmp";
+ string2FileNotSafe(str, tmpFile);
+
+
+ String bakFile = fileName + ".bak";
+ String prevContent = file2String(fileName);
+ if (prevContent != null) {
+ string2FileNotSafe(prevContent, bakFile);
+ }
+
+
+ File file = new File(fileName);
+ file.delete();
+
+
+ file = new File(tmpFile);
+ file.renameTo(new File(fileName));
+ }
+
+
+ public static final void string2FileNotSafe(final String str, final String fileName) throws IOException {
+ File file = new File(fileName);
+ File fileParent = file.getParentFile();
+ if (fileParent != null) {
+ fileParent.mkdirs();
+ }
+ FileWriter fileWriter = null;
+
+ try {
+ fileWriter = new FileWriter(file);
+ fileWriter.write(str);
+ } catch (IOException e) {
+ throw e;
+ } finally {
+ if (fileWriter != null) {
+ try {
+ fileWriter.close();
+ } catch (IOException e) {
+ throw e;
+ }
+ }
+ }
+ }
+
+
+ public static final String file2String(final String fileName) {
+ File file = new File(fileName);
+ return file2String(file);
+ }
+
+ public static final String file2String(final File file) {
+ if (file.exists()) {
+ char[] data = new char[(int) file.length()];
+ boolean result = false;
+
+ FileReader fileReader = null;
+ try {
+ fileReader = new FileReader(file);
+ int len = fileReader.read(data);
+ result = len == data.length;
+ } catch (IOException e) {
+ // e.printStackTrace();
+ } finally {
+ if (fileReader != null) {
+ try {
+ fileReader.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ if (result) {
+ return new String(data);
+ }
+ }
+ return null;
+ }
+
+ public static final String file2String(final URL url) {
+ InputStream in = null;
+ try {
+ URLConnection urlConnection = url.openConnection();
+ urlConnection.setUseCaches(false);
+ in = urlConnection.getInputStream();
+ int len = in.available();
+ byte[] data = new byte[len];
+ in.read(data, 0, len);
+ return new String(data, "UTF-8");
+ } catch (Exception e) {
+ } finally {
+ if (null != in) {
+ try {
+ in.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ return null;
+ }
+
+ public static String findClassPath(Class<?> c) {
+ URL url = c.getProtectionDomain().getCodeSource().getLocation();
+ return url.getPath();
+ }
+
+
+ public static void printObjectProperties(final Logger log, final Object object) {
+ printObjectProperties(log, object, false);
+ }
+
+
+ public static void printObjectProperties(final Logger log, final Object object, final boolean onlyImportantField) {
+ Field[] fields = object.getClass().getDeclaredFields();
+ for (Field field : fields) {
+ if (!Modifier.isStatic(field.getModifiers())) {
+ String name = field.getName();
+ if (!name.startsWith("this")) {
+ Object value = null;
+ try {
+ field.setAccessible(true);
+ value = field.get(object);
+ if (null == value) {
+ value = "";
+ }
+ } catch (IllegalArgumentException e) {
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ }
+
+ if (onlyImportantField) {
+ Annotation annotation = field.getAnnotation(ImportantField.class);
+ if (null == annotation) {
+ continue;
+ }
+ }
+
+ if (log != null) {
+ log.info(name + "=" + value);
+ } else {
+ }
+ }
+ }
+ }
+ }
+
+
+ public static String properties2String(final Properties properties) {
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+ if (entry.getValue() != null) {
+ sb.append(entry.getKey().toString() + "=" + entry.getValue().toString() + "\n");
+ }
+ }
+ return sb.toString();
+ }
+
+ public static Properties string2Properties(final String str) {
+ Properties properties = new Properties();
+ try {
+ InputStream in = new ByteArrayInputStream(str.getBytes(DEFAULT_CHARSET));
+ properties.load(in);
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ return null;
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+
+ return properties;
+ }
+
+ public static Properties object2Properties(final Object object) {
+ Properties properties = new Properties();
+
+ Field[] fields = object.getClass().getDeclaredFields();
+ for (Field field : fields) {
+ if (!Modifier.isStatic(field.getModifiers())) {
+ String name = field.getName();
+ if (!name.startsWith("this")) {
+ Object value = null;
+ try {
+ field.setAccessible(true);
+ value = field.get(object);
+ } catch (IllegalArgumentException e) {
+ e.printStackTrace();
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ }
+
+ if (value != null) {
+ properties.setProperty(name, value.toString());
+ }
+ }
+ }
+ }
+
+ return properties;
+ }
+
+ public static void properties2Object(final Properties p, final Object object) {
+ Method[] methods = object.getClass().getMethods();
+ for (Method method : methods) {
+ String mn = method.getName();
+ if (mn.startsWith("set")) {
+ try {
+ String tmp = mn.substring(4);
+ String first = mn.substring(3, 4);
+
+ String key = first.toLowerCase() + tmp;
+ String property = p.getProperty(key);
+ if (property != null) {
+ Class<?>[] pt = method.getParameterTypes();
+ if (pt != null && pt.length > 0) {
+ String cn = pt[0].getSimpleName();
+ Object arg = null;
+ if (cn.equals("int") || cn.equals("Integer")) {
+ arg = Integer.parseInt(property);
+ } else if (cn.equals("long") || cn.equals("Long")) {
+ arg = Long.parseLong(property);
+ } else if (cn.equals("double") || cn.equals("Double")) {
+ arg = Double.parseDouble(property);
+ } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+ arg = Boolean.parseBoolean(property);
+ } else if (cn.equals("float") || cn.equals("Float")) {
+ arg = Float.parseFloat(property);
+ } else if (cn.equals("String")) {
+ arg = property;
+ } else {
+ continue;
+ }
+ method.invoke(object, new Object[]{arg});
+ }
+ }
+ } catch (Throwable e) {
+ }
+ }
+ }
+ }
+
+
+ public static boolean isPropertiesEqual(final Properties p1, final Properties p2) {
+ return p1.equals(p2);
+ }
+
+
+ public static List<String> getLocalInetAddress() {
+ List<String> inetAddressList = new ArrayList<String>();
+ try {
+ Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
+ while (enumeration.hasMoreElements()) {
+ NetworkInterface networkInterface = enumeration.nextElement();
+ Enumeration<InetAddress> addrs = networkInterface.getInetAddresses();
+ while (addrs.hasMoreElements()) {
+ inetAddressList.add(addrs.nextElement().getHostAddress());
+ }
+ }
+ } catch (SocketException e) {
+ throw new RuntimeException("get local inet address fail", e);
+ }
+
+ return inetAddressList;
+ }
+
+
+ public static boolean isLocalAddr(String address) {
+ for (String addr : LOCAL_INET_ADDRESS) {
+ if (address.contains(addr))
+ return true;
+ }
+ return false;
+ }
+
+
+ private static String localhost() {
+ try {
+ InetAddress addr = InetAddress.getLocalHost();
+ return addr.getHostAddress();
+ } catch (Throwable e) {
+ throw new RuntimeException("InetAddress java.net.InetAddress.getLocalHost() throws UnknownHostException"
+ + FAQUrl.suggestTodo(FAQUrl.UNKNOWN_HOST_EXCEPTION),
+ e);
+ }
+ }
+
+
+ public static boolean compareAndIncreaseOnly(final AtomicLong target, final long value) {
+ long prev = target.get();
+ while (value > prev) {
+ boolean updated = target.compareAndSet(prev, value);
+ if (updated)
+ return true;
+
+ prev = target.get();
+ }
+
+ return false;
+ }
+
+ public static String localhostName() {
+ try {
+ return InetAddress.getLocalHost().getHostName();
+ } catch (Throwable e) {
+ throw new RuntimeException("InetAddress java.net.InetAddress.getLocalHost() throws UnknownHostException"
+ + FAQUrl.suggestTodo(FAQUrl.UNKNOWN_HOST_EXCEPTION),
+ e);
+ }
+ }
+
+ public Set<String> list2Set(List<String> values) {
+ Set<String> result = new HashSet<String>();
+ for (String v : values) {
+ result.add(v);
+ }
+ return result;
+ }
+
+ public List<String> set2List(Set<String> values) {
+ List<String> result = new ArrayList<String>();
+ for (String v : values) {
+ result.add(v);
+ }
+ return result;
+ }
+
+ public static String humanReadableByteCount(long bytes, boolean si) {
+ int unit = si ? 1000 : 1024;
+ if (bytes < unit) return bytes + " B";
+ int exp = (int) (Math.log(bytes) / Math.log(unit));
+ String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp - 1) + (si ? "" : "i");
+ return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/Pair.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/Pair.java b/common/src/main/java/org/apache/rocketmq/common/Pair.java
new file mode 100644
index 0000000..ed6c246
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/Pair.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+/**
+ * @author shijia.wxr
+ */
+public class Pair<T1, T2> {
+ private T1 object1;
+ private T2 object2;
+
+
+ public Pair(T1 object1, T2 object2) {
+ this.object1 = object1;
+ this.object2 = object2;
+ }
+
+
+ public T1 getObject1() {
+ return object1;
+ }
+
+
+ public void setObject1(T1 object1) {
+ this.object1 = object1;
+ }
+
+
+ public T2 getObject2() {
+ return object2;
+ }
+
+
+ public void setObject2(T2 object2) {
+ this.object2 = object2;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/ServiceState.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/ServiceState.java b/common/src/main/java/org/apache/rocketmq/common/ServiceState.java
new file mode 100644
index 0000000..97f5b90
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/ServiceState.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+/**
+ * @author shijia.wxr
+ */
+public enum ServiceState {
+ /**
+ * Service just created,not start
+ */
+ CREATE_JUST,
+ /**
+ * Service Running
+ */
+ RUNNING,
+ /**
+ * Service shutdown
+ */
+ SHUTDOWN_ALREADY,
+ /**
+ * Service Start failure
+ */
+ START_FAILED;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
new file mode 100644
index 0000000..4fd5154
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/ServiceThread.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author shijia.wxr
+ * @author xinyuzhou.zxy
+ */
+public abstract class ServiceThread implements Runnable {
+ private static final Logger STLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+ private static final long JOIN_TIME = 90 * 1000;
+
+ protected final Thread thread;
+
+ protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
+
+ protected volatile boolean stopped = false;
+
+ protected final CountDownLatch waitPoint = new CountDownLatch(1);
+
+
+ public ServiceThread() {
+ this.thread = new Thread(this, this.getServiceName());
+ }
+
+
+ public abstract String getServiceName();
+
+
+ public void start() {
+ this.thread.start();
+ }
+
+
+ public void shutdown() {
+ this.shutdown(false);
+ }
+
+ public void shutdown(final boolean interrupt) {
+ this.stopped = true;
+ STLOG.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
+
+ if (hasNotified.compareAndSet(false, true)) {
+ waitPoint.countDown(); // notify
+ }
+
+ try {
+ if (interrupt) {
+ this.thread.interrupt();
+ }
+
+ long beginTime = System.currentTimeMillis();
+ if (!this.thread.isDaemon()) {
+ this.thread.join(this.getJointime());
+ }
+ long eclipseTime = System.currentTimeMillis() - beginTime;
+ STLOG.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
+ + this.getJointime());
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public long getJointime() {
+ return JOIN_TIME;
+ }
+
+ public void stop() {
+ this.stop(false);
+ }
+
+ public void stop(final boolean interrupt) {
+ this.stopped = true;
+ STLOG.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
+
+ if (hasNotified.compareAndSet(false, true)) {
+ waitPoint.countDown(); // notify
+ }
+
+ if (interrupt) {
+ this.thread.interrupt();
+ }
+ }
+
+ public void makeStop() {
+ this.stopped = true;
+ STLOG.info("makestop thread " + this.getServiceName());
+ }
+
+ public void wakeup() {
+ if (hasNotified.compareAndSet(false, true)) {
+ waitPoint.countDown(); // notify
+ }
+ }
+
+ protected void waitForRunning(long interval) {
+ if (hasNotified.compareAndSet(true, false)) {
+ this.onWaitEnd();
+ return;
+ }
+
+ //entry to wait
+ waitPoint.reset();
+
+ try {
+ waitPoint.await(interval, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ hasNotified.set(false);
+ this.onWaitEnd();
+ }
+ }
+
+ protected void onWaitEnd() {
+ }
+
+ public boolean isStopped() {
+ return stopped;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/SystemClock.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/SystemClock.java b/common/src/main/java/org/apache/rocketmq/common/SystemClock.java
new file mode 100644
index 0000000..f86a4f5
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/SystemClock.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+/**
+ * @author vintage.wang
+ */
+public class SystemClock {
+ public long now() {
+ return System.currentTimeMillis();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
new file mode 100644
index 0000000..43ab2f2
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/ThreadFactoryImpl.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class ThreadFactoryImpl implements ThreadFactory {
+ private final AtomicLong threadIndex = new AtomicLong(0);
+ private final String threadNamePrefix;
+
+
+ public ThreadFactoryImpl(final String threadNamePrefix) {
+ this.threadNamePrefix = threadNamePrefix;
+ }
+
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet());
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
new file mode 100644
index 0000000..1aef5e7
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicConfig.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.common.constant.PermName;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicConfig {
+ private static final String SEPARATOR = " ";
+ public static int defaultReadQueueNums = 16;
+ public static int defaultWriteQueueNums = 16;
+ private String topicName;
+ private int readQueueNums = defaultReadQueueNums;
+ private int writeQueueNums = defaultWriteQueueNums;
+ private int perm = PermName.PERM_READ | PermName.PERM_WRITE;
+ private TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG;
+ private int topicSysFlag = 0;
+ private boolean order = false;
+
+
+ public TopicConfig() {
+ }
+
+
+ public TopicConfig(String topicName) {
+ this.topicName = topicName;
+ }
+
+
+ public TopicConfig(String topicName, int readQueueNums, int writeQueueNums, int perm) {
+ this.topicName = topicName;
+ this.readQueueNums = readQueueNums;
+ this.writeQueueNums = writeQueueNums;
+ this.perm = perm;
+ }
+
+
+ public String encode() {
+ StringBuilder sb = new StringBuilder();
+
+ // 1
+ sb.append(this.topicName);
+ sb.append(SEPARATOR);
+
+ // 2
+ sb.append(this.readQueueNums);
+ sb.append(SEPARATOR);
+
+ // 3
+ sb.append(this.writeQueueNums);
+ sb.append(SEPARATOR);
+
+ // 4
+ sb.append(this.perm);
+ sb.append(SEPARATOR);
+
+ // 5
+ sb.append(this.topicFilterType);
+
+ return sb.toString();
+ }
+
+
+ public boolean decode(final String in) {
+ String[] strs = in.split(SEPARATOR);
+ if (strs != null && strs.length == 5) {
+ this.topicName = strs[0];
+
+ this.readQueueNums = Integer.parseInt(strs[1]);
+
+ this.writeQueueNums = Integer.parseInt(strs[2]);
+
+ this.perm = Integer.parseInt(strs[3]);
+
+ this.topicFilterType = TopicFilterType.valueOf(strs[4]);
+
+ return true;
+ }
+
+ return false;
+ }
+
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+
+ public int getReadQueueNums() {
+ return readQueueNums;
+ }
+
+
+ public void setReadQueueNums(int readQueueNums) {
+ this.readQueueNums = readQueueNums;
+ }
+
+
+ public int getWriteQueueNums() {
+ return writeQueueNums;
+ }
+
+
+ public void setWriteQueueNums(int writeQueueNums) {
+ this.writeQueueNums = writeQueueNums;
+ }
+
+
+ public int getPerm() {
+ return perm;
+ }
+
+
+ public void setPerm(int perm) {
+ this.perm = perm;
+ }
+
+
+ public TopicFilterType getTopicFilterType() {
+ return topicFilterType;
+ }
+
+
+ public void setTopicFilterType(TopicFilterType topicFilterType) {
+ this.topicFilterType = topicFilterType;
+ }
+
+
+ public int getTopicSysFlag() {
+ return topicSysFlag;
+ }
+
+
+ public void setTopicSysFlag(int topicSysFlag) {
+ this.topicSysFlag = topicSysFlag;
+ }
+
+
+ public boolean isOrder() {
+ return order;
+ }
+
+
+ public void setOrder(boolean isOrder) {
+ this.order = isOrder;
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final TopicConfig that = (TopicConfig) o;
+
+ if (readQueueNums != that.readQueueNums) return false;
+ if (writeQueueNums != that.writeQueueNums) return false;
+ if (perm != that.perm) return false;
+ if (topicSysFlag != that.topicSysFlag) return false;
+ if (order != that.order) return false;
+ if (topicName != null ? !topicName.equals(that.topicName) : that.topicName != null) return false;
+ return topicFilterType == that.topicFilterType;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = topicName != null ? topicName.hashCode() : 0;
+ result = 31 * result + readQueueNums;
+ result = 31 * result + writeQueueNums;
+ result = 31 * result + perm;
+ result = 31 * result + (topicFilterType != null ? topicFilterType.hashCode() : 0);
+ result = 31 * result + topicSysFlag;
+ result = 31 * result + (order ? 1 : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "TopicConfig [topicName=" + topicName + ", readQueueNums=" + readQueueNums
+ + ", writeQueueNums=" + writeQueueNums + ", perm=" + PermName.perm2String(perm)
+ + ", topicFilterType=" + topicFilterType + ", topicSysFlag=" + topicSysFlag + ", order="
+ + order + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java b/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java
new file mode 100644
index 0000000..771fcaf
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/TopicFilterType.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+/**
+ * @author shijia.wxr
+ */
+public enum TopicFilterType {
+ SINGLE_TAG,
+ MULTI_TAG
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/de6f9416/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
new file mode 100644
index 0000000..2f9b72e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common;
+
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.text.NumberFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.zip.CRC32;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.InflaterInputStream;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class UtilAll {
+ public static final String YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
+ public static final String YYYY_MM_DD_HH_MM_SS_SSS = "yyyy-MM-dd#HH:mm:ss:SSS";
+ public static final String YYYY_MMDD_HHMMSS = "yyyyMMddHHmmss";
+
+
+ public static int getPid() {
+ RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();
+ String name = runtime.getName(); // format: "pid@hostname"
+ try {
+ return Integer.parseInt(name.substring(0, name.indexOf('@')));
+ } catch (Exception e) {
+ return -1;
+ }
+ }
+
+ public static String currentStackTrace() {
+ StringBuilder sb = new StringBuilder();
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ for (StackTraceElement ste : stackTrace) {
+ sb.append("\n\t");
+ sb.append(ste.toString());
+ }
+
+ return sb.toString();
+ }
+
+ public static String offset2FileName(final long offset) {
+ final NumberFormat nf = NumberFormat.getInstance();
+ nf.setMinimumIntegerDigits(20);
+ nf.setMaximumFractionDigits(0);
+ nf.setGroupingUsed(false);
+ return nf.format(offset);
+ }
+
+ public static long computeEclipseTimeMilliseconds(final long beginTime) {
+ return System.currentTimeMillis() - beginTime;
+ }
+
+
+ public static boolean isItTimeToDo(final String when) {
+ String[] whiles = when.split(";");
+ if (whiles != null && whiles.length > 0) {
+ Calendar now = Calendar.getInstance();
+ for (String w : whiles) {
+ int nowHour = Integer.parseInt(w);
+ if (nowHour == now.get(Calendar.HOUR_OF_DAY)) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+
+ public static String timeMillisToHumanString() {
+ return timeMillisToHumanString(System.currentTimeMillis());
+ }
+
+
+ public static String timeMillisToHumanString(final long t) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(t);
+ return String.format("%04d%02d%02d%02d%02d%02d%03d", cal.get(Calendar.YEAR), cal.get(Calendar.MONTH) + 1,
+ cal.get(Calendar.DAY_OF_MONTH), cal.get(Calendar.HOUR_OF_DAY), cal.get(Calendar.MINUTE), cal.get(Calendar.SECOND),
+ cal.get(Calendar.MILLISECOND));
+ }
+
+
+ public static long computNextMorningTimeMillis() {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(System.currentTimeMillis());
+ cal.add(Calendar.DAY_OF_MONTH, 1);
+ cal.set(Calendar.HOUR_OF_DAY, 0);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+
+ return cal.getTimeInMillis();
+ }
+
+
+ public static long computNextMinutesTimeMillis() {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(System.currentTimeMillis());
+ cal.add(Calendar.DAY_OF_MONTH, 0);
+ cal.add(Calendar.HOUR_OF_DAY, 0);
+ cal.add(Calendar.MINUTE, 1);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+
+ return cal.getTimeInMillis();
+ }
+
+
+ public static long computNextHourTimeMillis() {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(System.currentTimeMillis());
+ cal.add(Calendar.DAY_OF_MONTH, 0);
+ cal.add(Calendar.HOUR_OF_DAY, 1);
+ cal.set(Calendar.MINUTE, 0);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+
+ return cal.getTimeInMillis();
+ }
+
+
+ public static long computNextHalfHourTimeMillis() {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(System.currentTimeMillis());
+ cal.add(Calendar.DAY_OF_MONTH, 0);
+ cal.add(Calendar.HOUR_OF_DAY, 1);
+ cal.set(Calendar.MINUTE, 30);
+ cal.set(Calendar.SECOND, 0);
+ cal.set(Calendar.MILLISECOND, 0);
+
+ return cal.getTimeInMillis();
+ }
+
+
+ public static String timeMillisToHumanString2(final long t) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(t);
+ return String.format("%04d-%02d-%02d %02d:%02d:%02d,%03d",
+ cal.get(Calendar.YEAR),
+ cal.get(Calendar.MONTH) + 1,
+ cal.get(Calendar.DAY_OF_MONTH),
+ cal.get(Calendar.HOUR_OF_DAY),
+ cal.get(Calendar.MINUTE),
+ cal.get(Calendar.SECOND),
+ cal.get(Calendar.MILLISECOND));
+ }
+
+
+ public static String timeMillisToHumanString3(final long t) {
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(t);
+ return String.format("%04d%02d%02d%02d%02d%02d",
+ cal.get(Calendar.YEAR),
+ cal.get(Calendar.MONTH) + 1,
+ cal.get(Calendar.DAY_OF_MONTH),
+ cal.get(Calendar.HOUR_OF_DAY),
+ cal.get(Calendar.MINUTE),
+ cal.get(Calendar.SECOND));
+ }
+
+
+ public static double getDiskPartitionSpaceUsedPercent(final String path) {
+ if (null == path || path.isEmpty())
+ return -1;
+
+ try {
+ File file = new File(path);
+ if (!file.exists()) {
+ boolean result = file.mkdirs();
+ if (!result) {
+ }
+ }
+
+ long totalSpace = file.getTotalSpace();
+ long freeSpace = file.getFreeSpace();
+ long usedSpace = totalSpace - freeSpace;
+ if (totalSpace > 0) {
+ return usedSpace / (double) totalSpace;
+ }
+ } catch (Exception e) {
+ return -1;
+ }
+
+ return -1;
+ }
+
+
+ public static final int crc32(byte[] array) {
+ if (array != null) {
+ return crc32(array, 0, array.length);
+ }
+
+ return 0;
+ }
+
+
+ public static final int crc32(byte[] array, int offset, int length) {
+ CRC32 crc32 = new CRC32();
+ crc32.update(array, offset, length);
+ return (int) (crc32.getValue() & 0x7FFFFFFF);
+ }
+
+ final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
+
+ public static String bytes2string(byte[] src) {
+ char[] hexChars = new char[src.length * 2];
+ for (int j = 0; j < src.length; j++) {
+ int v = src[j] & 0xFF;
+ hexChars[j * 2] = HEX_ARRAY[v >>> 4];
+ hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
+ }
+ return new String(hexChars);
+ }
+
+ public static byte[] string2bytes(String hexString) {
+ if (hexString == null || hexString.equals("")) {
+ return null;
+ }
+ hexString = hexString.toUpperCase();
+ int length = hexString.length() / 2;
+ char[] hexChars = hexString.toCharArray();
+ byte[] d = new byte[length];
+ for (int i = 0; i < length; i++) {
+ int pos = i * 2;
+ d[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
+ }
+ return d;
+ }
+
+
+ private static byte charToByte(char c) {
+ return (byte) "0123456789ABCDEF".indexOf(c);
+ }
+
+
+ public static byte[] uncompress(final byte[] src) throws IOException {
+ byte[] result = src;
+ byte[] uncompressData = new byte[src.length];
+ ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src);
+ InflaterInputStream inflaterInputStream = new InflaterInputStream(byteArrayInputStream);
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length);
+
+ try {
+ while (true) {
+ int len = inflaterInputStream.read(uncompressData, 0, uncompressData.length);
+ if (len <= 0) {
+ break;
+ }
+ byteArrayOutputStream.write(uncompressData, 0, len);
+ }
+ byteArrayOutputStream.flush();
+ result = byteArrayOutputStream.toByteArray();
+ } catch (IOException e) {
+ throw e;
+ } finally {
+ try {
+ byteArrayInputStream.close();
+ } catch (IOException e) {
+ }
+ try {
+ inflaterInputStream.close();
+ } catch (IOException e) {
+ }
+ try {
+ byteArrayOutputStream.close();
+ } catch (IOException e) {
+ }
+ }
+
+ return result;
+ }
+
+
+ public static byte[] compress(final byte[] src, final int level) throws IOException {
+ byte[] result = src;
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length);
+ java.util.zip.Deflater defeater = new java.util.zip.Deflater(level);
+ DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(byteArrayOutputStream, defeater);
+ try {
+ deflaterOutputStream.write(src);
+ deflaterOutputStream.finish();
+ deflaterOutputStream.close();
+ result = byteArrayOutputStream.toByteArray();
+ } catch (IOException e) {
+ defeater.end();
+ throw e;
+ } finally {
+ try {
+ byteArrayOutputStream.close();
+ } catch (IOException ignored) {
+ }
+
+ defeater.end();
+ }
+
+ return result;
+ }
+
+
+ public static int asInt(String str, int defaultValue) {
+ try {
+ return Integer.parseInt(str);
+ } catch (Exception e) {
+ return defaultValue;
+ }
+ }
+
+
+ public static long asLong(String str, long defaultValue) {
+ try {
+ return Long.parseLong(str);
+ } catch (Exception e) {
+ return defaultValue;
+ }
+ }
+
+
+ public static String formatDate(Date date, String pattern) {
+ SimpleDateFormat df = new SimpleDateFormat(pattern);
+ return df.format(date);
+ }
+
+
+ public static Date parseDate(String date, String pattern) {
+ SimpleDateFormat df = new SimpleDateFormat(pattern);
+ try {
+ return df.parse(date);
+ } catch (ParseException e) {
+ return null;
+ }
+ }
+
+
+ public static String responseCode2String(final int code) {
+ return Integer.toString(code);
+ }
+
+
+ public static String frontStringAtLeast(final String str, final int size) {
+ if (str != null) {
+ if (str.length() > size) {
+ return str.substring(0, size);
+ }
+ }
+
+ return str;
+ }
+
+
+ public static boolean isBlank(String str) {
+ int strLen;
+ if (str == null || (strLen = str.length()) == 0) {
+ return true;
+ }
+ for (int i = 0; i < strLen; i++) {
+ if (!Character.isWhitespace(str.charAt(i))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+
+ public static String jstack() {
+ return jstack(Thread.getAllStackTraces());
+ }
+
+
+ public static String jstack(Map<Thread, StackTraceElement[]> map) {
+ StringBuilder result = new StringBuilder();
+ try {
+ Iterator<Map.Entry<Thread, StackTraceElement[]>> ite = map.entrySet().iterator();
+ while (ite.hasNext()) {
+ Map.Entry<Thread, StackTraceElement[]> entry = ite.next();
+ StackTraceElement[] elements = entry.getValue();
+ Thread thread = entry.getKey();
+ if (elements != null && elements.length > 0) {
+ String threadName = entry.getKey().getName();
+ result.append(String.format("%-40sTID: %d STATE: %s%n", threadName, thread.getId(), thread.getState()));
+ for (StackTraceElement el : elements) {
+ result.append(String.format("%-40s%s%n", threadName, el.toString()));
+ }
+ result.append("\n");
+ }
+ }
+ } catch (Throwable e) {
+ result.append(RemotingHelper.exceptionSimpleDesc(e));
+ }
+
+ return result.toString();
+ }
+
+ public static boolean isInternalIP(byte[] ip) {
+ if (ip.length != 4) {
+ throw new RuntimeException("illegal ipv4 bytes");
+ }
+
+
+ //10.0.0.0~10.255.255.255
+ //172.16.0.0~172.31.255.255
+ //192.168.0.0~192.168.255.255
+ if (ip[0] == (byte) 10) {
+
+ return true;
+ } else if (ip[0] == (byte) 172) {
+ if (ip[1] >= (byte) 16 && ip[1] <= (byte) 31) {
+ return true;
+ }
+ } else if (ip[0] == (byte) 192) {
+ if (ip[1] == (byte) 168) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static boolean ipCheck(byte[] ip) {
+ if (ip.length != 4) {
+ throw new RuntimeException("illegal ipv4 bytes");
+ }
+
+// if (ip[0] == (byte)30 && ip[1] == (byte)10 && ip[2] == (byte)163 && ip[3] == (byte)120) {
+// }
+
+
+ if (ip[0] >= (byte) 1 && ip[0] <= (byte) 126) {
+ if (ip[1] == (byte) 1 && ip[2] == (byte) 1 && ip[3] == (byte) 1) {
+ return false;
+ }
+ if (ip[1] == (byte) 0 && ip[2] == (byte) 0 && ip[3] == (byte) 0) {
+ return false;
+ }
+ return true;
+ } else if (ip[0] >= (byte) 128 && ip[0] <= (byte) 191) {
+ if (ip[2] == (byte) 1 && ip[3] == (byte) 1) {
+ return false;
+ }
+ if (ip[2] == (byte) 0 && ip[3] == (byte) 0) {
+ return false;
+ }
+ return true;
+ } else if (ip[0] >= (byte) 192 && ip[0] <= (byte) 223) {
+ if (ip[3] == (byte) 1) {
+ return false;
+ }
+ if (ip[3] == (byte) 0) {
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public static String ipToIPv4Str(byte[] ip) {
+ if (ip.length != 4) {
+ return null;
+ }
+ return new StringBuilder().append(ip[0] & 0xFF).append(".").append(
+ ip[1] & 0xFF).append(".").append(ip[2] & 0xFF)
+ .append(".").append(ip[3] & 0xFF).toString();
+ }
+
+ public static byte[] getIP() {
+ try {
+ Enumeration allNetInterfaces = NetworkInterface.getNetworkInterfaces();
+ InetAddress ip = null;
+ byte[] internalIP = null;
+ while (allNetInterfaces.hasMoreElements()) {
+ NetworkInterface netInterface = (NetworkInterface) allNetInterfaces.nextElement();
+ Enumeration addresses = netInterface.getInetAddresses();
+ while (addresses.hasMoreElements()) {
+ ip = (InetAddress) addresses.nextElement();
+ if (ip != null && ip instanceof Inet4Address) {
+ byte[] ipByte = ip.getAddress();
+ if (ipByte.length == 4) {
+ if (ipCheck(ipByte)) {
+ if (!isInternalIP(ipByte)) {
+ return ipByte;
+ } else if (internalIP == null) {
+ internalIP = ipByte;
+ }
+ }
+ }
+ }
+ }
+ }
+ if (internalIP != null) {
+ return internalIP;
+ } else {
+ throw new RuntimeException("Can not get local ip");
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Can not get local ip", e);
+ }
+ }
+}