You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/09/04 05:04:57 UTC

[11/14] incubator-rocketmq-externals git commit: upload rocketmq-cpp code

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/libs/signature/src/sha512.c
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/libs/signature/src/sha512.c b/rocketmq-cpp/libs/signature/src/sha512.c
new file mode 100755
index 0000000..c64aec6
--- /dev/null
+++ b/rocketmq-cpp/libs/signature/src/sha512.c
@@ -0,0 +1,616 @@
+/* sha512.c - Functions to compute SHA512 and SHA384 message digest of files or
+   memory blocks according to the NIST specification FIPS-180-2.
+
+   Copyright (C) 2005, 2006, 2008 Free Software Foundation, Inc.
+
+   This program is free software: you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation, either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.  */
+
+/* Written by David Madore, considerably copypasting from
+   Scott G. Miller's sha1.c
+*/
+
+/* #include <config.h> */
+
+#include "sha512.h"
+
+#include <stddef.h>
+#include <string.h>
+
+#if USE_UNLOCKED_IO
+# include "unlocked-io.h"
+#endif
+
+#ifdef __cplusplus
+namespace metaqSignature{
+#endif
+
+#ifdef WORDS_BIGENDIAN
+# define SWAP(n) (n)
+#else
+# define SWAP(n) \
+    u64or (u64or (u64or (u64shl (n, 56),				\
+			 u64shl (u64and (n, u64lo (0x0000ff00)), 40)),	\
+		  u64or (u64shl (u64and (n, u64lo (0x00ff0000)), 24),	\
+			 u64shl (u64and (n, u64lo (0xff000000)),  8))),	\
+	   u64or (u64or (u64and (u64shr (n,  8), u64lo (0xff000000)),	\
+			 u64and (u64shr (n, 24), u64lo (0x00ff0000))),	\
+		  u64or (u64and (u64shr (n, 40), u64lo (0x0000ff00)),	\
+			 u64shr (n, 56))))
+#endif
+
+#define BLOCKSIZE 4096
+#if BLOCKSIZE % 128 != 0
+# error "invalid BLOCKSIZE"
+#endif
+
+/* This array contains the bytes used to pad the buffer to the next
+   128-byte boundary.  */
+static const unsigned char fillbuf[128] = { 0x80, 0 /* , 0, 0, ...  */ };
+
+
+/*
+  Takes a pointer to a 512 bit block of data (eight 64 bit ints) and
+  intializes it to the start constants of the SHA512 algorithm.  This
+  must be called before using hash in the call to sha512_hash
+*/
+void
+sha512_init_ctx (struct sha512_ctx *ctx)
+{
+  ctx->state[0] = u64hilo (0x6a09e667, 0xf3bcc908);
+  ctx->state[1] = u64hilo (0xbb67ae85, 0x84caa73b);
+  ctx->state[2] = u64hilo (0x3c6ef372, 0xfe94f82b);
+  ctx->state[3] = u64hilo (0xa54ff53a, 0x5f1d36f1);
+  ctx->state[4] = u64hilo (0x510e527f, 0xade682d1);
+  ctx->state[5] = u64hilo (0x9b05688c, 0x2b3e6c1f);
+  ctx->state[6] = u64hilo (0x1f83d9ab, 0xfb41bd6b);
+  ctx->state[7] = u64hilo (0x5be0cd19, 0x137e2179);
+
+  ctx->total[0] = ctx->total[1] = u64lo (0);
+  ctx->buflen = 0;
+}
+
+void
+sha384_init_ctx (struct sha512_ctx *ctx)
+{
+  ctx->state[0] = u64hilo (0xcbbb9d5d, 0xc1059ed8);
+  ctx->state[1] = u64hilo (0x629a292a, 0x367cd507);
+  ctx->state[2] = u64hilo (0x9159015a, 0x3070dd17);
+  ctx->state[3] = u64hilo (0x152fecd8, 0xf70e5939);
+  ctx->state[4] = u64hilo (0x67332667, 0xffc00b31);
+  ctx->state[5] = u64hilo (0x8eb44a87, 0x68581511);
+  ctx->state[6] = u64hilo (0xdb0c2e0d, 0x64f98fa7);
+  ctx->state[7] = u64hilo (0x47b5481d, 0xbefa4fa4);
+
+  ctx->total[0] = ctx->total[1] = u64lo (0);
+  ctx->buflen = 0;
+}
+
+/* Copy the value from V into the memory location pointed to by *CP,
+   If your architecture allows unaligned access, this is equivalent to
+   * (__typeof__ (v) *) cp = v  */
+#ifdef WIN32
+static _inline void
+#else
+static __inline__ void
+#endif
+set_uint64 (char *cp, u64 v)
+{
+  memcpy (cp, &v, sizeof v);
+}
+
+/* Put result from CTX in first 64 bytes following RESBUF.
+   The result must be in little endian byte order.  */
+void *
+sha512_read_ctx (const struct sha512_ctx *ctx, void *resbuf)
+{
+  int i;
+  char *r = (char*)resbuf;
+
+  for (i = 0; i < 8; i++)
+    set_uint64 (r + i * sizeof ctx->state[0], SWAP (ctx->state[i]));
+
+  return resbuf;
+}
+
+void *
+sha384_read_ctx (const struct sha512_ctx *ctx, void *resbuf)
+{
+  int i;
+  char *r = (char*)resbuf;
+
+  for (i = 0; i < 6; i++)
+    set_uint64 (r + i * sizeof ctx->state[0], SWAP (ctx->state[i]));
+
+  return resbuf;
+}
+
+/* Process the remaining bytes in the internal buffer and the usual
+   prolog according to the standard and write the result to RESBUF.  */
+static void
+sha512_conclude_ctx (struct sha512_ctx *ctx)
+{
+  /* Take yet unprocessed bytes into account.  */
+  size_t bytes = ctx->buflen;
+  size_t size = (bytes < 112) ? 128 / 8 : 128 * 2 / 8;
+
+  /* Now count remaining bytes.  */
+  ctx->total[0] = u64plus (ctx->total[0], u64lo (bytes));
+  if (u64lt (ctx->total[0], u64lo (bytes)))
+    ctx->total[1] = u64plus (ctx->total[1], u64lo (1));
+
+  /* Put the 128-bit file length in *bits* at the end of the buffer.
+     Use set_uint64 rather than a simple assignment, to avoid risk of
+     unaligned access.  */
+  set_uint64 ((char *) &ctx->buffer[size - 2],
+	      SWAP (u64or (u64shl (ctx->total[1], 3),
+			   u64shr (ctx->total[0], 61))));
+  set_uint64 ((char *) &ctx->buffer[size - 1],
+	      SWAP (u64shl (ctx->total[0], 3)));
+
+  memcpy (&((char *) ctx->buffer)[bytes], fillbuf, (size - 2) * 8 - bytes);
+
+  /* Process last bytes.  */
+  sha512_process_block (ctx->buffer, size * 8, ctx);
+}
+
+void *
+sha512_finish_ctx (struct sha512_ctx *ctx, void *resbuf)
+{
+  sha512_conclude_ctx (ctx);
+  return sha512_read_ctx (ctx, resbuf);
+}
+
+void *
+sha384_finish_ctx (struct sha512_ctx *ctx, void *resbuf)
+{
+  sha512_conclude_ctx (ctx);
+  return sha384_read_ctx (ctx, resbuf);
+}
+
+/* Compute SHA512 message digest for bytes read from STREAM.  The
+   resulting message digest number will be written into the 64 bytes
+   beginning at RESBLOCK.  */
+int
+sha512_stream (FILE *stream, void *resblock)
+{
+  struct sha512_ctx ctx;
+  char buffer[BLOCKSIZE + 72];
+  size_t sum;
+
+  /* Initialize the computation context.  */
+  sha512_init_ctx (&ctx);
+
+  /* Iterate over full file contents.  */
+  while (1)
+    {
+      /* We read the file in blocks of BLOCKSIZE bytes.  One call of the
+	 computation function processes the whole buffer so that with the
+	 next round of the loop another block can be read.  */
+      size_t n;
+      sum = 0;
+
+      /* Read block.  Take care for partial reads.  */
+      while (1)
+	{
+	  n = fread (buffer + sum, 1, BLOCKSIZE - sum, stream);
+
+	  sum += n;
+
+	  if (sum == BLOCKSIZE)
+	    break;
+
+	  if (n == 0)
+	    {
+	      /* Check for the error flag IFF N == 0, so that we don't
+		 exit the loop after a partial read due to e.g., EAGAIN
+		 or EWOULDBLOCK.  */
+	      if (ferror (stream))
+		return 1;
+	      goto process_partial_block;
+	    }
+
+	  /* We've read at least one byte, so ignore errors.  But always
+	     check for EOF, since feof may be true even though N > 0.
+	     Otherwise, we could end up calling fread after EOF.  */
+	  if (feof (stream))
+	    goto process_partial_block;
+	}
+
+      /* Process buffer with BLOCKSIZE bytes.  Note that
+			BLOCKSIZE % 128 == 0
+       */
+      sha512_process_block (buffer, BLOCKSIZE, &ctx);
+    }
+
+ process_partial_block:;
+
+  /* Process any remaining bytes.  */
+  if (sum > 0)
+    sha512_process_bytes (buffer, sum, &ctx);
+
+  /* Construct result in desired memory.  */
+  sha512_finish_ctx (&ctx, resblock);
+  return 0;
+}
+
+/* FIXME: Avoid code duplication */
+int
+sha384_stream (FILE *stream, void *resblock)
+{
+  struct sha512_ctx ctx;
+  char buffer[BLOCKSIZE + 72];
+  size_t sum;
+
+  /* Initialize the computation context.  */
+  sha384_init_ctx (&ctx);
+
+  /* Iterate over full file contents.  */
+  while (1)
+    {
+      /* We read the file in blocks of BLOCKSIZE bytes.  One call of the
+	 computation function processes the whole buffer so that with the
+	 next round of the loop another block can be read.  */
+      size_t n;
+      sum = 0;
+
+      /* Read block.  Take care for partial reads.  */
+      while (1)
+	{
+	  n = fread (buffer + sum, 1, BLOCKSIZE - sum, stream);
+
+	  sum += n;
+
+	  if (sum == BLOCKSIZE)
+	    break;
+
+	  if (n == 0)
+	    {
+	      /* Check for the error flag IFF N == 0, so that we don't
+		 exit the loop after a partial read due to e.g., EAGAIN
+		 or EWOULDBLOCK.  */
+	      if (ferror (stream))
+		return 1;
+	      goto process_partial_block;
+	    }
+
+	  /* We've read at least one byte, so ignore errors.  But always
+	     check for EOF, since feof may be true even though N > 0.
+	     Otherwise, we could end up calling fread after EOF.  */
+	  if (feof (stream))
+	    goto process_partial_block;
+	}
+
+      /* Process buffer with BLOCKSIZE bytes.  Note that
+			BLOCKSIZE % 128 == 0
+       */
+      sha512_process_block (buffer, BLOCKSIZE, &ctx);
+    }
+
+ process_partial_block:;
+
+  /* Process any remaining bytes.  */
+  if (sum > 0)
+    sha512_process_bytes (buffer, sum, &ctx);
+
+  /* Construct result in desired memory.  */
+  sha384_finish_ctx (&ctx, resblock);
+  return 0;
+}
+
+/* Compute SHA512 message digest for LEN bytes beginning at BUFFER.  The
+   result is always in little endian byte order, so that a byte-wise
+   output yields to the wanted ASCII representation of the message
+   digest.  */
+void *
+sha512_buffer (const char *buffer, size_t len, void *resblock)
+{
+  struct sha512_ctx ctx;
+
+  /* Initialize the computation context.  */
+  sha512_init_ctx (&ctx);
+
+  /* Process whole buffer but last len % 128 bytes.  */
+  sha512_process_bytes (buffer, len, &ctx);
+
+  /* Put result in desired memory area.  */
+  return sha512_finish_ctx (&ctx, resblock);
+}
+
+void *
+sha384_buffer (const char *buffer, size_t len, void *resblock)
+{
+  struct sha512_ctx ctx;
+
+  /* Initialize the computation context.  */
+  sha384_init_ctx (&ctx);
+
+  /* Process whole buffer but last len % 128 bytes.  */
+  sha512_process_bytes (buffer, len, &ctx);
+
+  /* Put result in desired memory area.  */
+  return sha384_finish_ctx (&ctx, resblock);
+}
+
+void
+sha512_process_bytes (const void *buffer, size_t len, struct sha512_ctx *ctx)
+{
+  /* When we already have some bits in our internal buffer concatenate
+     both inputs first.  */
+  if (ctx->buflen != 0)
+    {
+      size_t left_over = ctx->buflen;
+      size_t add = 256 - left_over > len ? len : 256 - left_over;
+
+      memcpy (&((char *) ctx->buffer)[left_over], buffer, add);
+      ctx->buflen += add;
+
+      if (ctx->buflen > 128)
+	{
+	  sha512_process_block (ctx->buffer, ctx->buflen & ~127, ctx);
+
+	  ctx->buflen &= 127;
+	  /* The regions in the following copy operation cannot overlap.  */
+	  memcpy (ctx->buffer,
+		  &((char *) ctx->buffer)[(left_over + add) & ~127],
+		  ctx->buflen);
+	}
+
+      buffer = (const char *) buffer + add;
+      len -= add;
+    }
+
+  /* Process available complete blocks.  */
+  if (len >= 128)
+    {
+#if !_STRING_ARCH_unaligned
+# define alignof(type) offsetof (struct { char c; type x; }, x)
+# define UNALIGNED_P(p) (((size_t) p) % alignof (u64) != 0)
+      if (UNALIGNED_P (buffer))
+	while (len > 128)
+	  {
+	    sha512_process_block (memcpy (ctx->buffer, buffer, 128), 128, ctx);
+	    buffer = (const char *) buffer + 128;
+	    len -= 128;
+	  }
+      else
+#endif
+	{
+	  sha512_process_block (buffer, len & ~127, ctx);
+	  buffer = (const char *) buffer + (len & ~127);
+	  len &= 127;
+	}
+    }
+
+  /* Move remaining bytes in internal buffer.  */
+  if (len > 0)
+    {
+      size_t left_over = ctx->buflen;
+
+      memcpy (&((char *) ctx->buffer)[left_over], buffer, len);
+      left_over += len;
+      if (left_over >= 128)
+	{
+	  sha512_process_block (ctx->buffer, 128, ctx);
+	  left_over -= 128;
+	  memcpy (ctx->buffer, &ctx->buffer[16], left_over);
+	}
+      ctx->buflen = left_over;
+    }
+}
+
+/* --- Code below is the primary difference between sha1.c and sha512.c --- */
+
+/* SHA512 round constants */
+#define K(I) sha512_round_constants[I]
+static u64 const sha512_round_constants[80] = {
+  u64init (0x428a2f98, 0xd728ae22), u64init (0x71374491, 0x23ef65cd),
+  u64init (0xb5c0fbcf, 0xec4d3b2f), u64init (0xe9b5dba5, 0x8189dbbc),
+  u64init (0x3956c25b, 0xf348b538), u64init (0x59f111f1, 0xb605d019),
+  u64init (0x923f82a4, 0xaf194f9b), u64init (0xab1c5ed5, 0xda6d8118),
+  u64init (0xd807aa98, 0xa3030242), u64init (0x12835b01, 0x45706fbe),
+  u64init (0x243185be, 0x4ee4b28c), u64init (0x550c7dc3, 0xd5ffb4e2),
+  u64init (0x72be5d74, 0xf27b896f), u64init (0x80deb1fe, 0x3b1696b1),
+  u64init (0x9bdc06a7, 0x25c71235), u64init (0xc19bf174, 0xcf692694),
+  u64init (0xe49b69c1, 0x9ef14ad2), u64init (0xefbe4786, 0x384f25e3),
+  u64init (0x0fc19dc6, 0x8b8cd5b5), u64init (0x240ca1cc, 0x77ac9c65),
+  u64init (0x2de92c6f, 0x592b0275), u64init (0x4a7484aa, 0x6ea6e483),
+  u64init (0x5cb0a9dc, 0xbd41fbd4), u64init (0x76f988da, 0x831153b5),
+  u64init (0x983e5152, 0xee66dfab), u64init (0xa831c66d, 0x2db43210),
+  u64init (0xb00327c8, 0x98fb213f), u64init (0xbf597fc7, 0xbeef0ee4),
+  u64init (0xc6e00bf3, 0x3da88fc2), u64init (0xd5a79147, 0x930aa725),
+  u64init (0x06ca6351, 0xe003826f), u64init (0x14292967, 0x0a0e6e70),
+  u64init (0x27b70a85, 0x46d22ffc), u64init (0x2e1b2138, 0x5c26c926),
+  u64init (0x4d2c6dfc, 0x5ac42aed), u64init (0x53380d13, 0x9d95b3df),
+  u64init (0x650a7354, 0x8baf63de), u64init (0x766a0abb, 0x3c77b2a8),
+  u64init (0x81c2c92e, 0x47edaee6), u64init (0x92722c85, 0x1482353b),
+  u64init (0xa2bfe8a1, 0x4cf10364), u64init (0xa81a664b, 0xbc423001),
+  u64init (0xc24b8b70, 0xd0f89791), u64init (0xc76c51a3, 0x0654be30),
+  u64init (0xd192e819, 0xd6ef5218), u64init (0xd6990624, 0x5565a910),
+  u64init (0xf40e3585, 0x5771202a), u64init (0x106aa070, 0x32bbd1b8),
+  u64init (0x19a4c116, 0xb8d2d0c8), u64init (0x1e376c08, 0x5141ab53),
+  u64init (0x2748774c, 0xdf8eeb99), u64init (0x34b0bcb5, 0xe19b48a8),
+  u64init (0x391c0cb3, 0xc5c95a63), u64init (0x4ed8aa4a, 0xe3418acb),
+  u64init (0x5b9cca4f, 0x7763e373), u64init (0x682e6ff3, 0xd6b2b8a3),
+  u64init (0x748f82ee, 0x5defb2fc), u64init (0x78a5636f, 0x43172f60),
+  u64init (0x84c87814, 0xa1f0ab72), u64init (0x8cc70208, 0x1a6439ec),
+  u64init (0x90befffa, 0x23631e28), u64init (0xa4506ceb, 0xde82bde9),
+  u64init (0xbef9a3f7, 0xb2c67915), u64init (0xc67178f2, 0xe372532b),
+  u64init (0xca273ece, 0xea26619c), u64init (0xd186b8c7, 0x21c0c207),
+  u64init (0xeada7dd6, 0xcde0eb1e), u64init (0xf57d4f7f, 0xee6ed178),
+  u64init (0x06f067aa, 0x72176fba), u64init (0x0a637dc5, 0xa2c898a6),
+  u64init (0x113f9804, 0xbef90dae), u64init (0x1b710b35, 0x131c471b),
+  u64init (0x28db77f5, 0x23047d84), u64init (0x32caab7b, 0x40c72493),
+  u64init (0x3c9ebe0a, 0x15c9bebc), u64init (0x431d67c4, 0x9c100d4c),
+  u64init (0x4cc5d4be, 0xcb3e42b6), u64init (0x597f299c, 0xfc657e2a),
+  u64init (0x5fcb6fab, 0x3ad6faec), u64init (0x6c44198c, 0x4a475817),
+};
+
+/* Round functions.  */
+#define F2(A, B, C) u64or (u64and (A, B), u64and (C, u64or (A, B)))
+#define F1(E, F, G) u64xor (G, u64and (E, u64xor (F, G)))
+
+/* Process LEN bytes of BUFFER, accumulating context into CTX.
+   It is assumed that LEN % 128 == 0.
+   Most of this code comes from GnuPG's cipher/sha1.c.  */
+
+void
+sha512_process_block (const void *buffer, size_t len, struct sha512_ctx *ctx)
+{
+  u64 const *words = (u64 const *)buffer;
+  u64 const *endp = words + len / sizeof (u64);
+  u64 x[16];
+  u64 a = ctx->state[0];
+  u64 b = ctx->state[1];
+  u64 c = ctx->state[2];
+  u64 d = ctx->state[3];
+  u64 e = ctx->state[4];
+  u64 f = ctx->state[5];
+  u64 g = ctx->state[6];
+  u64 h = ctx->state[7];
+
+  /* First increment the byte count.  FIPS PUB 180-2 specifies the possible
+     length of the file up to 2^128 bits.  Here we only compute the
+     number of bytes.  Do a double word increment.  */
+  ctx->total[0] = u64plus (ctx->total[0], u64lo (len));
+  if (u64lt (ctx->total[0], u64lo (len)))
+    ctx->total[1] = u64plus (ctx->total[1], u64lo (1));
+
+#define S0(x) u64xor (u64rol(x, 63), u64xor (u64rol (x, 56), u64shr (x, 7)))
+#define S1(x) u64xor (u64rol (x, 45), u64xor (u64rol (x, 3), u64shr (x, 6)))
+#define SS0(x) u64xor (u64rol (x, 36), u64xor (u64rol (x, 30), u64rol (x, 25)))
+#define SS1(x) u64xor (u64rol(x, 50), u64xor (u64rol (x, 46), u64rol (x, 23)))
+
+#define M(I) (x[(I) & 15]						  \
+	      = u64plus (x[(I) & 15],					  \
+			 u64plus (S1 (x[((I) - 2) & 15]),		  \
+				  u64plus (x[((I) - 7) & 15],		  \
+					   S0 (x[((I) - 15) & 15])))))
+
+#define R(A, B, C, D, E, F, G, H, K, M)					  \
+  do									  \
+    {									  \
+      u64 t0 = u64plus (SS0 (A), F2 (A, B, C));				  \
+      u64 t1 =								  \
+	u64plus (H, u64plus (SS1 (E),					  \
+			     u64plus (F1 (E, F, G), u64plus (K, M))));	  \
+      D = u64plus (D, t1);						  \
+      H = u64plus (t0, t1);						  \
+    }									  \
+  while (0)
+
+  while (words < endp)
+    {
+      int t;
+      /* FIXME: see sha1.c for a better implementation.  */
+      for (t = 0; t < 16; t++)
+	{
+	  x[t] = SWAP (*words);
+	  words++;
+	}
+
+      R( a, b, c, d, e, f, g, h, K( 0), x[ 0] );
+      R( h, a, b, c, d, e, f, g, K( 1), x[ 1] );
+      R( g, h, a, b, c, d, e, f, K( 2), x[ 2] );
+      R( f, g, h, a, b, c, d, e, K( 3), x[ 3] );
+      R( e, f, g, h, a, b, c, d, K( 4), x[ 4] );
+      R( d, e, f, g, h, a, b, c, K( 5), x[ 5] );
+      R( c, d, e, f, g, h, a, b, K( 6), x[ 6] );
+      R( b, c, d, e, f, g, h, a, K( 7), x[ 7] );
+      R( a, b, c, d, e, f, g, h, K( 8), x[ 8] );
+      R( h, a, b, c, d, e, f, g, K( 9), x[ 9] );
+      R( g, h, a, b, c, d, e, f, K(10), x[10] );
+      R( f, g, h, a, b, c, d, e, K(11), x[11] );
+      R( e, f, g, h, a, b, c, d, K(12), x[12] );
+      R( d, e, f, g, h, a, b, c, K(13), x[13] );
+      R( c, d, e, f, g, h, a, b, K(14), x[14] );
+      R( b, c, d, e, f, g, h, a, K(15), x[15] );
+      R( a, b, c, d, e, f, g, h, K(16), M(16) );
+      R( h, a, b, c, d, e, f, g, K(17), M(17) );
+      R( g, h, a, b, c, d, e, f, K(18), M(18) );
+      R( f, g, h, a, b, c, d, e, K(19), M(19) );
+      R( e, f, g, h, a, b, c, d, K(20), M(20) );
+      R( d, e, f, g, h, a, b, c, K(21), M(21) );
+      R( c, d, e, f, g, h, a, b, K(22), M(22) );
+      R( b, c, d, e, f, g, h, a, K(23), M(23) );
+      R( a, b, c, d, e, f, g, h, K(24), M(24) );
+      R( h, a, b, c, d, e, f, g, K(25), M(25) );
+      R( g, h, a, b, c, d, e, f, K(26), M(26) );
+      R( f, g, h, a, b, c, d, e, K(27), M(27) );
+      R( e, f, g, h, a, b, c, d, K(28), M(28) );
+      R( d, e, f, g, h, a, b, c, K(29), M(29) );
+      R( c, d, e, f, g, h, a, b, K(30), M(30) );
+      R( b, c, d, e, f, g, h, a, K(31), M(31) );
+      R( a, b, c, d, e, f, g, h, K(32), M(32) );
+      R( h, a, b, c, d, e, f, g, K(33), M(33) );
+      R( g, h, a, b, c, d, e, f, K(34), M(34) );
+      R( f, g, h, a, b, c, d, e, K(35), M(35) );
+      R( e, f, g, h, a, b, c, d, K(36), M(36) );
+      R( d, e, f, g, h, a, b, c, K(37), M(37) );
+      R( c, d, e, f, g, h, a, b, K(38), M(38) );
+      R( b, c, d, e, f, g, h, a, K(39), M(39) );
+      R( a, b, c, d, e, f, g, h, K(40), M(40) );
+      R( h, a, b, c, d, e, f, g, K(41), M(41) );
+      R( g, h, a, b, c, d, e, f, K(42), M(42) );
+      R( f, g, h, a, b, c, d, e, K(43), M(43) );
+      R( e, f, g, h, a, b, c, d, K(44), M(44) );
+      R( d, e, f, g, h, a, b, c, K(45), M(45) );
+      R( c, d, e, f, g, h, a, b, K(46), M(46) );
+      R( b, c, d, e, f, g, h, a, K(47), M(47) );
+      R( a, b, c, d, e, f, g, h, K(48), M(48) );
+      R( h, a, b, c, d, e, f, g, K(49), M(49) );
+      R( g, h, a, b, c, d, e, f, K(50), M(50) );
+      R( f, g, h, a, b, c, d, e, K(51), M(51) );
+      R( e, f, g, h, a, b, c, d, K(52), M(52) );
+      R( d, e, f, g, h, a, b, c, K(53), M(53) );
+      R( c, d, e, f, g, h, a, b, K(54), M(54) );
+      R( b, c, d, e, f, g, h, a, K(55), M(55) );
+      R( a, b, c, d, e, f, g, h, K(56), M(56) );
+      R( h, a, b, c, d, e, f, g, K(57), M(57) );
+      R( g, h, a, b, c, d, e, f, K(58), M(58) );
+      R( f, g, h, a, b, c, d, e, K(59), M(59) );
+      R( e, f, g, h, a, b, c, d, K(60), M(60) );
+      R( d, e, f, g, h, a, b, c, K(61), M(61) );
+      R( c, d, e, f, g, h, a, b, K(62), M(62) );
+      R( b, c, d, e, f, g, h, a, K(63), M(63) );
+      R( a, b, c, d, e, f, g, h, K(64), M(64) );
+      R( h, a, b, c, d, e, f, g, K(65), M(65) );
+      R( g, h, a, b, c, d, e, f, K(66), M(66) );
+      R( f, g, h, a, b, c, d, e, K(67), M(67) );
+      R( e, f, g, h, a, b, c, d, K(68), M(68) );
+      R( d, e, f, g, h, a, b, c, K(69), M(69) );
+      R( c, d, e, f, g, h, a, b, K(70), M(70) );
+      R( b, c, d, e, f, g, h, a, K(71), M(71) );
+      R( a, b, c, d, e, f, g, h, K(72), M(72) );
+      R( h, a, b, c, d, e, f, g, K(73), M(73) );
+      R( g, h, a, b, c, d, e, f, K(74), M(74) );
+      R( f, g, h, a, b, c, d, e, K(75), M(75) );
+      R( e, f, g, h, a, b, c, d, K(76), M(76) );
+      R( d, e, f, g, h, a, b, c, K(77), M(77) );
+      R( c, d, e, f, g, h, a, b, K(78), M(78) );
+      R( b, c, d, e, f, g, h, a, K(79), M(79) );
+
+      a = ctx->state[0] = u64plus (ctx->state[0], a);
+      b = ctx->state[1] = u64plus (ctx->state[1], b);
+      c = ctx->state[2] = u64plus (ctx->state[2], c);
+      d = ctx->state[3] = u64plus (ctx->state[3], d);
+      e = ctx->state[4] = u64plus (ctx->state[4], e);
+      f = ctx->state[5] = u64plus (ctx->state[5], f);
+      g = ctx->state[6] = u64plus (ctx->state[6], g);
+      h = ctx->state[7] = u64plus (ctx->state[7], h);
+   }
+}
+
+#ifdef __cplusplus
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/libs/signature/src/spas_client.c
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/libs/signature/src/spas_client.c b/rocketmq-cpp/libs/signature/src/spas_client.c
new file mode 100755
index 0000000..08b5a2d
--- /dev/null
+++ b/rocketmq-cpp/libs/signature/src/spas_client.c
@@ -0,0 +1,508 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include "spas_client.h"
+#include "sha1.h"
+#include "sha256.h"
+#include "hmac.h"
+#include "base64.h"
+
+#ifdef WIN32
+#include <io.h>  
+#include <process.h>  
+#else
+#include <unistd.h>
+#endif
+#ifdef SPAS_MT
+#include <pthread.h>
+#endif
+
+#ifdef __cplusplus
+namespace metaqSignature{
+#endif
+
+#define SPAS_VERSION "SPAS_V1_0"
+
+static SPAS_CREDENTIAL g_credential;
+static char g_path[SPAS_MAX_PATH];
+static int g_loaded = 0;
+static unsigned int refresh = 10;
+static time_t modified = 0;
+
+#ifdef SPAS_MT
+
+static pthread_mutex_t cred_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_once_t cred_once = PTHREAD_ONCE_INIT;
+static pthread_key_t cred_key;
+
+#endif
+
+extern void * _mem_alloc(unsigned int size);
+extern void * _mem_realloc(void *ptr, unsigned int old_size, unsigned int new_size);
+extern void _mem_free(void *ptr);
+extern void _trim(char *str);
+
+
+void * _mem_alloc(unsigned int size) {
+	void *p = malloc(size);
+	if (p != NULL) {
+		memset(p, 0, size);
+	}
+	return p;
+}
+
+void * _mem_realloc(void *ptr, unsigned int old_size, unsigned int new_size) {
+	void *p = realloc(ptr, new_size);
+	if (p != NULL && new_size > old_size) {
+		memset((unsigned int *)p + old_size, 0, new_size - old_size);
+	}
+	return p;
+}
+
+
+void _mem_free(void *ptr) {
+	free(ptr);
+}
+
+void _trim(char *str) {
+	int len = strlen(str);
+	int i;
+	int done = 0;
+	for (i = len - 1; i >= 0; i--) {
+		switch (str[i]) {
+		case ' ':
+		case '\t':
+		case '\r':
+		case '\n':
+			str[i] = '\0';
+			break;
+		default:
+			done = 1;
+			break;
+		}
+		if (done) {
+			break;
+		}
+	}
+}
+
+static int _load_credential(SPAS_CREDENTIAL *pcred, char *path) {
+	FILE *fp = NULL;
+	char buf[SPAS_MAX_KEY_LEN * 2];
+	if (pcred == NULL || path == NULL) {
+		return ERROR_INVALID_PARAM;
+	}
+	fp = fopen(path, "r");
+	if (fp == NULL) {
+		return ERROR_FILE_OPEN;
+	}
+	memset(pcred, 0, sizeof(SPAS_CREDENTIAL));
+	while (fgets(buf, sizeof(buf), fp)) {
+		_trim(buf);
+		int len = strlen(SPAS_ACCESS_KEY_TAG);
+		if (strncmp(buf, SPAS_ACCESS_KEY_TAG, len) == 0 && buf[len] == '=') {
+			strncpy(pcred->access_key, buf + len + 1, SPAS_MAX_KEY_LEN - 1);
+		}
+		else {
+			len = strlen(SPAS_SECRET_KEY_TAG);
+			if (strncmp(buf, SPAS_SECRET_KEY_TAG, len) == 0 && buf[len] == '=') {
+				strncpy(pcred->secret_key, buf + len + 1, SPAS_MAX_KEY_LEN - 1);
+			}
+		}
+	}
+	fclose(fp);
+	if (strlen(pcred->access_key) == 0 || strlen(pcred->secret_key) == 0) {
+		return ERROR_MISSING_KEY;
+	}
+	return NO_ERROR;
+}
+
+#ifndef WIN32
+static void _reload_credential(int sig) {
+	int ret;
+	SPAS_CREDENTIAL credential;
+	struct stat status;
+	struct sigaction act;
+	
+	if (sig != SIGALRM) {
+		return;
+	}
+	
+	memset(&act, 0, sizeof(act));
+	act.sa_handler = _reload_credential;
+	sigaction(SIGALRM, &act, NULL);
+	alarm(refresh);
+	if (g_path[0] != '\0') {
+		ret = stat(g_path, &status);
+		if (ret != 0) {
+			return;
+		}
+		if (status.st_mtime == modified) {
+			return;
+		}
+		ret = _load_credential(&credential, g_path);
+		if (ret != NO_ERROR) {
+			return;
+		}
+#ifdef SPAS_MT
+		pthread_mutex_lock(&cred_mutex);
+#endif
+		memcpy(&g_credential, &credential, sizeof(SPAS_CREDENTIAL));
+#ifdef SPAS_MT
+		pthread_mutex_unlock(&cred_mutex);
+#endif
+		modified = status.st_mtime;
+	}
+}
+
+static int _update_credential_by_alarm() {
+	struct sigaction act;
+	
+	memset(&act, 0, sizeof(act));
+	act.sa_handler = _reload_credential;
+	sigaction(SIGALRM, &act, NULL);
+	alarm(refresh);
+	return NO_ERROR;
+}
+#endif 
+
+#ifdef SPAS_MT
+
+static void * _update_credential_entry(void *arg) {
+	int ret;
+	SPAS_CREDENTIAL credential;
+	struct stat status;
+	struct timeval tv;
+	while (1) {
+		tv.tv_sec = refresh;
+		tv.tv_usec = 0;
+		select(0, NULL, NULL, NULL, &tv);
+		if (g_path[0] != '\0') {
+			ret = stat(g_path, &status);
+			if (ret != 0) {
+				continue;
+			}
+			if (status.st_mtime == modified) {
+				continue;
+			}
+			ret = _load_credential(&credential, g_path);
+			if (ret != NO_ERROR) {
+				continue;
+			}
+			pthread_mutex_lock(&cred_mutex);
+			memcpy(&g_credential, &credential, sizeof(SPAS_CREDENTIAL));
+			pthread_mutex_unlock(&cred_mutex);
+			modified = status.st_mtime;
+		}
+	}
+	return NULL;
+}
+
+static int _update_credential_by_thread() {
+	pthread_t tid;
+	int ret;
+
+	ret = pthread_create(&tid, NULL, _update_credential_entry, NULL);
+	if (ret != 0) {
+		return ERROR_UPDATE_CREDENTIAL;
+	}
+	pthread_detach(tid);
+	return NO_ERROR;
+}
+
+
+
+int spas_load_credential(char *path, CREDENTIAL_UPDATE_MODE mode) {
+	int ret = NO_ERROR;
+	SPAS_CREDENTIAL credential;
+	
+	if (g_loaded) {
+		return NO_ERROR;
+	}
+	if (path == NULL) {
+		path = getenv(SPAS_CREDENTIAL_ENV);
+		if (path == NULL) {
+			return ERROR_NO_CREDENTIAL;
+		}
+	}
+	strncpy(g_path, path, SPAS_MAX_PATH - 1);
+	ret = _load_credential(&credential, path);
+	if (ret != NO_ERROR) {
+		return ret;
+	}
+#ifdef SPAS_MT
+	pthread_mutex_lock(&cred_mutex);
+#endif
+	if (!g_loaded) {
+		memcpy(&g_credential, &credential, sizeof(SPAS_CREDENTIAL));
+		g_loaded = 1;
+	}
+#ifdef SPAS_MT
+	pthread_mutex_unlock(&cred_mutex);
+#endif
+	switch (mode) {
+	case UPDATE_BY_ALARM:
+		ret = _update_credential_by_alarm();
+		break;
+#ifdef SPAS_MT
+	case UPDATE_BY_THREAD:
+		ret = _update_credential_by_thread();
+		break;
+#endif
+	case NO_UPDATE:
+	default:
+		ret = NO_ERROR;
+		break;
+	}
+	return ret;
+}
+
+#endif
+
+SPAS_CREDENTIAL * spas_get_credential(void) {
+	SPAS_CREDENTIAL *credential = (SPAS_CREDENTIAL *)_mem_alloc(sizeof(SPAS_CREDENTIAL));
+	if (credential != NULL) {
+#ifdef SPAS_MT
+		pthread_mutex_lock(&cred_mutex);
+#endif
+		memcpy(credential, &g_credential, sizeof(SPAS_CREDENTIAL));
+#ifdef SPAS_MT
+		pthread_mutex_unlock(&cred_mutex);
+#endif
+	}
+	return credential;
+}
+
+
+int spas_set_access_key(char *key) {
+	int len = 0;
+	if (key == NULL) {
+		return ERROR_INVALID_PARAM;
+	}
+	len = strlen(key);
+	if (len == 0 || len >= SPAS_MAX_KEY_LEN) {
+		return ERROR_KEY_LENGTH;
+	}
+#ifdef SPAS_MT
+	pthread_mutex_lock(&cred_mutex);
+#endif
+	memcpy(g_credential.access_key, key, len + 1);
+#ifdef SPAS_MT
+	pthread_mutex_unlock(&cred_mutex);
+#endif
+	return NO_ERROR;
+}
+
+int spas_set_secret_key(char *key) {
+	int len = 0;
+	if (key == NULL) {
+		return ERROR_INVALID_PARAM;
+	}
+	len = strlen(key);
+	if (len == 0 || len >= SPAS_MAX_KEY_LEN) {
+		return ERROR_KEY_LENGTH;
+	}
+#ifdef SPAS_MT
+	pthread_mutex_lock(&cred_mutex);
+#endif
+	memcpy(g_credential.secret_key, key, len + 1);
+#ifdef SPAS_MT
+	pthread_mutex_unlock(&cred_mutex);
+#endif
+	return NO_ERROR;
+}
+
+char * spas_get_access_key() {
+	return g_credential.access_key;
+}
+
+char * spas_get_secret_key() {
+	return g_credential.secret_key;
+}
+
+#ifdef SPAS_MT
+
+static void _free_thread_credential(void* credential)
+{
+	if (credential != NULL) {
+		_mem_free(credential);
+	}
+}
+
+static void _init_credential_key(void) {
+	pthread_key_create(&cred_key, _free_thread_credential);
+}
+
+static SPAS_CREDENTIAL * _get_thread_credential(void) {
+	int ret = 0;
+	SPAS_CREDENTIAL *credential = NULL;
+	ret = pthread_once(&cred_once, _init_credential_key);
+	if (ret != 0) {
+		return NULL;
+	}
+	credential = pthread_getspecific(cred_key);
+	if (credential == NULL) {
+		credential = _mem_alloc(sizeof(SPAS_CREDENTIAL));
+		if (credential == NULL) {
+			return NULL;
+		}
+		ret = pthread_setspecific(cred_key, credential);
+		if (ret != 0) {
+			_mem_free(credential);
+			return NULL;
+		}
+	}
+	return credential;
+}
+
+int spas_load_thread_credential(char *path) {
+	int ret = NO_ERROR;
+	SPAS_CREDENTIAL * credential = NULL;
+	credential = _get_thread_credential();
+	if (credential == NULL) {
+		return ERROR_MEM_ALLOC;
+	}
+	ret = _load_credential(credential, path);
+	if (ret != NO_ERROR) {
+		memset(credential, 0, sizeof(SPAS_CREDENTIAL));
+		return ret;
+	}
+	return NO_ERROR;
+}
+
+int spas_set_thread_access_key(char *key) {
+	int len = 0;
+	SPAS_CREDENTIAL * credential = NULL;
+	if (key == NULL) {
+		return ERROR_INVALID_PARAM;
+	}
+	len = strlen(key);
+	if (len == 0 || len >= SPAS_MAX_KEY_LEN) {
+		return ERROR_KEY_LENGTH;
+	}
+	credential = _get_thread_credential();
+	if (credential == NULL) {
+		return ERROR_MEM_ALLOC;
+	}
+	memcpy(credential->access_key, key, len + 1);
+	return NO_ERROR;
+}
+
+int spas_set_thread_secret_key(char *key) {
+	int len = 0;
+	SPAS_CREDENTIAL * credential = NULL;
+	if (key == NULL) {
+		return ERROR_INVALID_PARAM;
+	}
+	len = strlen(key);
+	if (len == 0 || len >= SPAS_MAX_KEY_LEN) {
+		return ERROR_KEY_LENGTH;
+	}
+	credential = _get_thread_credential();
+	if (credential == NULL) {
+		return ERROR_MEM_ALLOC;
+	}
+	memcpy(credential->secret_key, key, len + 1);
+	return NO_ERROR;
+
+}
+
+char * spas_get_thread_access_key(void) {
+	SPAS_CREDENTIAL * credential = _get_thread_credential();
+	if (credential == NULL) {
+		return NULL;
+	}
+	return credential->access_key;
+}
+
+char * spas_get_thread_secret_key(void) {
+	SPAS_CREDENTIAL * credential = _get_thread_credential();
+	if (credential == NULL) {
+		return NULL;
+	}
+	return credential->secret_key;
+}
+
+#endif
+
+
+char * spas_get_signature(const SPAS_PARAM_LIST *list, const char *key) {
+	return spas_get_signature2(list, key, SIGN_HMACSHA1);
+}
+
+char * spas_get_signature2(const SPAS_PARAM_LIST *list, const char *key, SPAS_SIGN_ALGORITHM algorithm) {
+	char *sign = NULL;
+	char *data = NULL;
+	if (list == NULL || key == NULL) {
+		return NULL;
+	}
+	data = param_list_to_str(list);
+	if (data == NULL) {
+		return NULL;
+	}
+	sign = spas_sign2(data, strlen(data),key, algorithm);
+	_mem_free(data);
+	return sign;
+}
+
+char * spas_sign(const char *data, size_t size, const char *key) {
+	return spas_sign2(data, size, key, SIGN_HMACSHA1);
+}
+
+char * spas_sign2(const char *data, size_t size, const char *key, SPAS_SIGN_ALGORITHM algorithm) {
+	int ret;
+	int dsize = 0;
+	char *sha_buf = NULL;
+	char *base64_ret = NULL;
+	if (data == NULL || key == NULL) {
+		return NULL;
+	}
+	if (algorithm == SIGN_HMACSHA1) {
+		dsize = SHA1_DIGEST_SIZE;
+		sha_buf = (char *)_mem_alloc(dsize + 1);
+		if (sha_buf == NULL) {
+			return NULL;
+		}
+		ret = hmac_sha1(key, strlen(key), data, size, sha_buf);
+		if (ret < 0) {
+			_mem_free(sha_buf);
+			return NULL;
+		}
+	}
+	else if (algorithm == SIGN_HMACSHA256) {
+		dsize = SHA256_DIGEST_SIZE;
+		sha_buf = (char *)_mem_alloc(dsize + 1);
+		if (sha_buf == NULL) {
+			return NULL;
+		}
+		ret = hmac_sha256(key, strlen(key), data, strlen(data), sha_buf);
+		if (ret < 0) {
+			_mem_free(sha_buf);
+			return NULL;
+		}
+	}
+	else {
+		return NULL;
+	}
+	ret = base64_encode_alloc(sha_buf, dsize, &base64_ret);
+	_mem_free(sha_buf);
+	return base64_ret;
+	
+}
+
+void    spas_mem_free(char *pSignature)
+{
+	_mem_free(pSignature);
+}
+
+char * spas_get_version(void) {
+	return SPAS_VERSION;
+}
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/project/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/project/CMakeLists.txt b/rocketmq-cpp/project/CMakeLists.txt
new file mode 100755
index 0000000..02723af
--- /dev/null
+++ b/rocketmq-cpp/project/CMakeLists.txt
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# source files
+project(rocketmq4cpp)
+
+file(GLOB_RECURSE SRC_FILES   ${CMAKE_SOURCE_DIR}/src/*)
+list(REMOVE_ITEM  SRC_FILES   ${CMAKE_SOURCE_DIR}/src/dllmain.cpp)
+
+# subdirs
+SET(SUB_DIRS)
+file(GLOB children ${CMAKE_SOURCE_DIR}/src/*)
+FOREACH(child ${children})
+	IF(IS_DIRECTORY ${child})
+	    LIST(APPEND SUB_DIRS ${child})
+	ENDIF()
+ENDFOREACH()
+LIST(APPEND SUB_DIRS ${CMAKE_SOURCE_DIR}/src)
+
+include_directories(${CMAKE_SOURCE_DIR}/include)
+include_directories(${SUB_DIRS})
+
+# libs_directories
+file(GLOB LIB_DIRS ${CMAKE_SOURCE_DIR}/libs/*)
+foreach(dir ${LIB_DIRS})
+    if(IS_DIRECTORY ${dir})
+        set(CMAKE_PREFIX_PATH ${CMAKE_PREFIX_PATH};${dir})
+        include_directories(${dir}/include)
+    endif()
+endforeach()
+
+# static
+add_library(rocketmq_static STATIC ${SRC_FILES})
+set_target_properties(rocketmq_static PROPERTIES OUTPUT_NAME "rocketmq")
+add_dependencies(rocketmq_static Signature)
+target_link_libraries(rocketmq_static ${deplibs})
+target_link_libraries(rocketmq_static Signature)
+
+# shared
+set(CMAKE_SHARED_LINKER_FLAGS "-DBOOST_ALL_DYN_LINK -shared")
+add_library(rocketmq_shared SHARED ${SRC_FILES})
+set_target_properties(rocketmq_shared PROPERTIES OUTPUT_NAME "rocketmq")
+add_dependencies(rocketmq_shared Signature)
+target_link_libraries(rocketmq_shared ${deplibs})
+target_link_libraries(rocketmq_shared  Signature)
+
+# install
+install (TARGETS   rocketmq_static             DESTINATION bin)
+install (DIRECTORY ${CMAKE_SOURCE_DIR}/include/ DESTINATION include)
+install (DIRECTORY ${CMAKE_SOURCE_DIR}/doc/     DESTINATION doc)
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/project/tool.mak
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/project/tool.mak b/rocketmq-cpp/project/tool.mak
new file mode 100644
index 0000000..e8f57fc
--- /dev/null
+++ b/rocketmq-cpp/project/tool.mak
@@ -0,0 +1,21 @@
+define BUILD_LIBRARY
+$(if $(wildcard $@),@$(RM) $@)
+$(if $(wildcard ar.mac),@$(RM) ar.mac)
+$(if $(filter %.a, $^),
+@echo CREATE $@ > ar.mac
+@echo SAVE >> ar.mac
+@echo END >> ar.mac
+@$(AR) -M < ar.mac
+)
+$(if $(filter %.o,$^),@$(AR) -q $@ $(filter %.o, $^))
+$(if $(filter %.a, $^),
+@echo OPEN $@ > ar.mac
+$(foreach LIB, $(filter %.a, $^),
+@echo ADDLIB $(LIB) >> ar.mac
+)
+@echo SAVE >> ar.mac
+@echo END >> ar.mac
+@$(AR) -M < ar.mac
+@$(RM) ar.mac
+)
+endef
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/MQClientAPIImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/MQClientAPIImpl.cpp b/rocketmq-cpp/src/MQClientAPIImpl.cpp
new file mode 100755
index 0000000..2d9c39c
--- /dev/null
+++ b/rocketmq-cpp/src/MQClientAPIImpl.cpp
@@ -0,0 +1,922 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MQClientAPIImpl.h"
+#include <assert.h>
+#include <fstream>
+#include "CommunicationMode.h"
+#include "Logging.h"
+#include "MQDecoder.h"
+#include "PullResultExt.h"
+
+namespace rocketmq {
+//<!************************************************************************
+MQClientAPIImpl::MQClientAPIImpl(
+    const string& mqClientId, ClientRemotingProcessor* clientRemotingProcessor, int pullThreadNum,
+    uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout,
+    string unitName)
+    : m_firstFetchNameSrv(true), m_mqClientId(mqClientId) {
+  m_pRemotingClient.reset(new TcpRemotingClient(
+      pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout));
+  m_pRemotingClient->registerProcessor(CHECK_TRANSACTION_STATE,
+                                       clientRemotingProcessor);
+  m_pRemotingClient->registerProcessor(RESET_CONSUMER_CLIENT_OFFSET,
+                                       clientRemotingProcessor);
+  m_pRemotingClient->registerProcessor(GET_CONSUMER_STATUS_FROM_CLIENT,
+                                       clientRemotingProcessor);
+  m_pRemotingClient->registerProcessor(GET_CONSUMER_RUNNING_INFO,
+                                       clientRemotingProcessor);
+  m_pRemotingClient->registerProcessor(NOTIFY_CONSUMER_IDS_CHANGED,
+                                       clientRemotingProcessor);
+  m_pRemotingClient->registerProcessor(CONSUME_MESSAGE_DIRECTLY,
+                                       clientRemotingProcessor);
+
+  m_topAddressing.reset(new TopAddressing(unitName));
+}
+
+MQClientAPIImpl::~MQClientAPIImpl() {
+  m_pRemotingClient = NULL;
+  m_topAddressing = NULL;
+}
+
+void MQClientAPIImpl::stopAllTcpTransportThread() {
+  m_pRemotingClient->stopAllTcpTransportThread();
+}
+
+bool MQClientAPIImpl::writeDataToFile(string filename, string data,
+                                      bool isSync) {
+  if (data.size() == 0) return false;
+
+  int fd = open(filename.c_str(), O_RDWR | O_CREAT, 0755);
+  if (fd < 0) {
+    LOG_ERROR("open file failed, file:%s, msg:%s", filename.c_str(),
+              strerror(errno));
+    return false;
+  }
+
+  int byte_write = 0;
+  int byte_left = data.size();
+  const char* pData = data.c_str();
+  while (byte_left > 0) {
+    byte_write = write(fd, pData, byte_left);
+    if (byte_write == -1) {
+      LOG_ERROR("write data fail, data len:%zu, file:%s, msg:%s", data.size(),
+                filename.c_str(), strerror(errno));
+      close(fd);
+      return false;
+    }
+    byte_left -= byte_write;
+    pData += byte_write;
+  }
+  pData = NULL;
+
+  if (isSync) {
+    LOG_INFO("fsync with filename:%s", filename.c_str());
+    fsync(fd);
+  }
+  close(fd);
+
+  return true;
+}
+
+string MQClientAPIImpl::fetchNameServerAddr(const string& NSDomain) {
+  try {
+    string homeDir(UtilAll::getHomeDirectory());
+    string storePath = homeDir + "/logs/metaq-client4cpp/snapshot";
+
+    if (access(storePath.c_str(), F_OK) != 0) {
+      if (mkdir(storePath.c_str(), S_IRWXU | S_IRWXG | S_IRWXO) != 0) {
+        LOG_ERROR("create data dir:%s error", storePath.c_str());
+      }
+    }
+    string file(storePath);
+    string fileBak(storePath);
+    vector<string> ret_;
+    int retSize = UtilAll::Split(ret_, m_mqClientId, "@");
+    if(retSize==2){
+      file.append("/nameserver_addr-").append(ret_[retSize-1]);
+    }else{
+      LOG_ERROR("split mqClientId:%s fail", m_mqClientId.c_str());
+      file.append("/nameserver_addr-DEFAULT");
+    }
+    fileBak.append("/nameserver_addr.bak");
+    const string addrs = m_topAddressing->fetchNSAddr(NSDomain);
+    if (addrs.empty()) {
+      if (m_nameSrvAddr.empty()) {
+        LOG_INFO("Load the name server snapshot local file:%s", file.c_str());
+        if (access(file.c_str(), F_OK) == 0) {
+          ifstream snapshot_file(file, ios::binary);
+          istreambuf_iterator<char> beg(snapshot_file), end;
+          string filecontent(beg, end);
+          updateNameServerAddr(filecontent);
+          m_nameSrvAddr = filecontent;
+        } else {
+          LOG_WARN("The name server snapshot local file not exists");
+        }
+      }
+    } else {
+      if (m_firstFetchNameSrv == true) {
+        // it is the first time, so need to create the name server snapshot
+        // local file
+        m_firstFetchNameSrv = false;
+      }
+      if (addrs.compare(m_nameSrvAddr) != 0) {
+        LOG_INFO("name server address changed, old: %s, new: %s",
+                 m_nameSrvAddr.c_str(), addrs.c_str());
+        updateNameServerAddr(addrs);
+        m_nameSrvAddr = addrs;
+      } else {
+        if (!m_firstFetchNameSrv) return m_nameSrvAddr;
+      }
+      // update the snapshot local file if nameSrv changes or
+      // m_firstFetchNameSrv==true
+      if (writeDataToFile(fileBak, addrs, true)) {
+        if (rename(fileBak.c_str(), file.c_str()) == -1)
+          LOG_ERROR("could not rename bak file:%s", strerror(errno));
+      }
+    }
+
+    if (access(file.c_str(), F_OK) != 0) {
+      // the name server snapshot local file maybe deleted by force, create it
+      if (writeDataToFile(fileBak, m_nameSrvAddr, true)) {
+        if (rename(fileBak.c_str(), file.c_str()) == -1)
+          LOG_ERROR("could not rename bak file:%s", strerror(errno));
+      }
+    }
+  } catch (...) {
+  }
+  return m_nameSrvAddr;
+}
+
+void MQClientAPIImpl::updateNameServerAddr(const string& addrs) {
+  if (m_pRemotingClient != NULL)
+    m_pRemotingClient->updateNameServerAddressList(addrs);
+}
+
+void MQClientAPIImpl::callSignatureBeforeRequest(
+    const string& addr, RemotingCommand& request,
+    const SessionCredentials& session_credentials) {
+  ClientRPCHook rpcHook(session_credentials);
+  rpcHook.doBeforeRequest(addr, request);
+}
+
+// Note: all request rules: throw exception if got broker error response,
+// exclude getTopicRouteInfoFromNameServer and unregisterClient
+void MQClientAPIImpl::createTopic(
+    const string& addr, const string& defaultTopic, TopicConfig topicConfig,
+    const SessionCredentials& sessionCredentials) {
+  string topicWithProjectGroup = topicConfig.getTopicName();
+  CreateTopicRequestHeader* requestHeader = new CreateTopicRequestHeader();
+  requestHeader->topic = (topicWithProjectGroup);
+  requestHeader->defaultTopic = (defaultTopic);
+  requestHeader->readQueueNums = (topicConfig.getReadQueueNums());
+  requestHeader->writeQueueNums = (topicConfig.getWriteQueueNums());
+  requestHeader->perm = (topicConfig.getPerm());
+  requestHeader->topicFilterType = (topicConfig.getTopicFilterType());
+
+  RemotingCommand request(UPDATE_AND_CREATE_TOPIC, requestHeader);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  unique_ptr<RemotingCommand> response(
+      m_pRemotingClient->invokeSync(addr, request));
+
+  if (response) {
+    switch (response->getCode()) {
+      case SUCCESS_VALUE:
+        return;
+      default:
+        break;
+    }
+    THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+                      response->getCode());
+  }
+  THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+SendResult MQClientAPIImpl::sendMessage(
+    const string& addr, const string& brokerName, const MQMessage& msg,
+    SendMessageRequestHeader* pRequestHeader, int timeoutMillis,
+    int communicationMode, SendCallback* pSendCallback,
+    const SessionCredentials& sessionCredentials) {
+  RemotingCommand request(SEND_MESSAGE, pRequestHeader);
+  string body = msg.getBody();
+  request.SetBody(body.c_str(), body.length());
+  request.setMsgBody(body);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  switch (communicationMode) {
+    case ComMode_ONEWAY:
+      m_pRemotingClient->invokeOneway(addr, request);
+      break;
+    case ComMode_ASYNC:
+      sendMessageAsync(addr, brokerName, msg, request, pSendCallback,
+                       timeoutMillis);
+      break;
+    case ComMode_SYNC:
+      return sendMessageSync(addr, brokerName, msg, request, timeoutMillis);
+    default:
+      break;
+  }
+  return SendResult();
+}
+
+void MQClientAPIImpl::sendHearbeat(
+    const string& addr, HeartbeatData* pHeartbeatData,
+    const SessionCredentials& sessionCredentials) {
+  RemotingCommand request(HEART_BEAT, NULL);
+
+  string body;
+  pHeartbeatData->Encode(body);
+  request.SetBody(body.data(), body.length());
+  request.setMsgBody(body);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  if (m_pRemotingClient->invokeHeartBeat(addr, request)) {
+    LOG_INFO("sendheartbeat to broker:%s success", addr.c_str());
+  }
+}
+
+void MQClientAPIImpl::unregisterClient(
+    const string& addr, const string& clientID, const string& producerGroup,
+    const string& consumerGroup, const SessionCredentials& sessionCredentials) {
+  LOG_INFO("unregisterClient to broker:%s", addr.c_str());
+  RemotingCommand request(UNREGISTER_CLIENT,
+                          new UnregisterClientRequestHeader(
+                              clientID, producerGroup, consumerGroup));
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  unique_ptr<RemotingCommand> response(
+      m_pRemotingClient->invokeSync(addr, request));
+
+  if (response) {
+    switch (response->getCode()) {
+      case SUCCESS_VALUE:
+        LOG_INFO("unregisterClient to:%s success", addr.c_str());
+        return;
+      default:
+        break;
+    }
+    LOG_WARN("unregisterClient fail:%s,%d", response->getRemark().c_str(),
+             response->getCode());
+  }
+}
+
+// return NULL if got no response or error response
+TopicRouteData* MQClientAPIImpl::getTopicRouteInfoFromNameServer(
+    const string& topic, int timeoutMillis,
+    const SessionCredentials& sessionCredentials) {
+  RemotingCommand request(GET_ROUTEINTO_BY_TOPIC,
+                          new GetRouteInfoRequestHeader(topic));
+  callSignatureBeforeRequest("", request, sessionCredentials);
+  request.Encode();
+
+  unique_ptr<RemotingCommand> pResponse(
+      m_pRemotingClient->invokeSync("", request, timeoutMillis));
+
+  if (pResponse != NULL) {
+    if (((*(pResponse->GetBody())).getSize() == 0) ||
+        ((*(pResponse->GetBody())).getData() != NULL)) {
+      switch (pResponse->getCode()) {
+        case SUCCESS_VALUE: {
+          const MemoryBlock* pbody = pResponse->GetBody();
+          if (pbody->getSize()) {
+            TopicRouteData* topicRoute = TopicRouteData::Decode(pbody);
+            return topicRoute;
+          }
+        }
+        case TOPIC_NOT_EXIST: {
+          return NULL;
+        }
+        default:
+          break;
+      }
+      LOG_WARN("%s,%d", pResponse->getRemark().c_str(), pResponse->getCode());
+      return NULL;
+    }
+  }
+  return NULL;
+}
+
+TopicList* MQClientAPIImpl::getTopicListFromNameServer(
+    const SessionCredentials& sessionCredentials) {
+  RemotingCommand request(GET_ALL_TOPIC_LIST_FROM_NAMESERVER, NULL);
+  callSignatureBeforeRequest("", request, sessionCredentials);
+  request.Encode();
+
+  unique_ptr<RemotingCommand> pResponse(
+      m_pRemotingClient->invokeSync("", request));
+  if (pResponse != NULL) {
+    if (((*(pResponse->GetBody())).getSize() == 0) ||
+        ((*(pResponse->GetBody())).getData() != NULL)) {
+      switch (pResponse->getCode()) {
+        case SUCCESS_VALUE: {
+          const MemoryBlock* pbody = pResponse->GetBody();
+          if (pbody->getSize()) {
+            TopicList* topicList = TopicList::Decode(pbody);
+            return topicList;
+          }
+        }
+        default:
+          break;
+      }
+
+      THROW_MQEXCEPTION(MQClientException, pResponse->getRemark(),
+                        pResponse->getCode());
+    }
+  }
+  return NULL;
+}
+
+int MQClientAPIImpl::wipeWritePermOfBroker(const string& namesrvAddr,
+                                           const string& brokerName,
+                                           int timeoutMillis) {
+  return 0;
+}
+
+void MQClientAPIImpl::deleteTopicInBroker(const string& addr,
+                                          const string& topic,
+                                          int timeoutMillis) {}
+
+void MQClientAPIImpl::deleteTopicInNameServer(const string& addr,
+                                              const string& topic,
+                                              int timeoutMillis) {}
+
+void MQClientAPIImpl::deleteSubscriptionGroup(const string& addr,
+                                              const string& groupName,
+                                              int timeoutMillis) {}
+
+string MQClientAPIImpl::getKVConfigByValue(const string& projectNamespace,
+                                           const string& projectGroup,
+                                           int timeoutMillis) {
+  return "";
+}
+
+KVTable MQClientAPIImpl::getKVListByNamespace(const string& projectNamespace,
+                                              int timeoutMillis) {
+  return KVTable();
+}
+
+void MQClientAPIImpl::deleteKVConfigByValue(const string& projectNamespace,
+                                            const string& projectGroup,
+                                            int timeoutMillis) {}
+
+SendResult MQClientAPIImpl::sendMessageSync(const string& addr,
+                                            const string& brokerName,
+                                            const MQMessage& msg,
+                                            RemotingCommand& request,
+                                            int timeoutMillis) {
+  //<!block util response;
+  unique_ptr<RemotingCommand> pResponse(
+      m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+  if (pResponse != NULL) {
+    try {
+      LOG_DEBUG("sendMessageSync success:%s to addr:%s,brokername:%s",
+                msg.toString().c_str(), addr.c_str(), brokerName.c_str());
+      SendResult result = processSendResponse(brokerName, msg, pResponse.get());
+      return result;
+    } catch (...) {
+      LOG_ERROR("send error");
+    }
+  }
+  THROW_MQEXCEPTION(MQClientException, "response is null", -1);
+}
+
+void MQClientAPIImpl::sendMessageAsync(const string& addr,
+                                       const string& brokerName,
+                                       const MQMessage& msg,
+                                       RemotingCommand& request,
+                                       SendCallback* pSendCallback,
+                                       int64 timeoutMilliseconds) {
+  //<!delete in future;
+  AsyncCallbackWrap* cbw =
+      new SendCallbackWrap(brokerName, msg, pSendCallback, this);
+  if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMilliseconds) ==
+      false) {
+    LOG_ERROR("sendMessageAsync failed to addr:%s", addr.c_str());
+    if (cbw) {
+      cbw->onException();
+      deleteAndZero(cbw);
+    } else {
+      THROW_MQEXCEPTION(MQClientException, "sendMessageAsync failed", -1);
+    }
+  }
+}
+
+PullResult* MQClientAPIImpl::pullMessage(
+    const string& addr, PullMessageRequestHeader* pRequestHeader,
+    int timeoutMillis, int communicationMode, PullCallback* pullCallback,
+    void* pArg, const SessionCredentials& sessionCredentials) {
+  RemotingCommand request(PULL_MESSAGE, pRequestHeader);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  switch (communicationMode) {
+    case ComMode_ONEWAY:
+      break;
+    case ComMode_ASYNC:
+      pullMessageAsync(addr, request, timeoutMillis, pullCallback, pArg);
+      break;
+    case ComMode_SYNC:
+      return pullMessageSync(addr, request, timeoutMillis);
+    default:
+      break;
+  }
+
+  return NULL;
+}
+
+void MQClientAPIImpl::pullMessageAsync(const string& addr,
+                                       RemotingCommand& request,
+                                       int timeoutMillis,
+                                       PullCallback* pullCallback, void* pArg) {
+  //<!delete in future;
+  AsyncCallbackWrap* cbw = new PullCallbackWarp(pullCallback, this, pArg);
+  if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) ==
+      false) {
+    LOG_ERROR("pullMessageAsync failed of addr:%s", addr.c_str());
+    deleteAndZero(cbw);
+    THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1);
+  }
+}
+
+PullResult* MQClientAPIImpl::pullMessageSync(const string& addr,
+                                             RemotingCommand& request,
+                                             int timeoutMillis) {
+  unique_ptr<RemotingCommand> pResponse(
+      m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+  if (pResponse != NULL) {
+    if (((*(pResponse->GetBody())).getSize() == 0) ||
+        ((*(pResponse->GetBody())).getData() != NULL)) {
+      try {
+        PullResult* pullResult =
+            processPullResponse(pResponse.get());  // pullMessage will handle
+                                                   // exception from
+                                                   // processPullResponse
+        return pullResult;
+      } catch (MQException& e) {
+        LOG_ERROR(e.what());
+        return NULL;
+      }
+    }
+  }
+  return NULL;
+}
+
+SendResult MQClientAPIImpl::processSendResponse(const string& brokerName,
+                                                const MQMessage& msg,
+                                                RemotingCommand* pResponse) {
+  SendStatus sendStatus = SEND_OK;
+  int res = 0;
+  switch (pResponse->getCode()) {
+    case FLUSH_DISK_TIMEOUT:
+      sendStatus = SEND_FLUSH_DISK_TIMEOUT;
+      break;
+    case FLUSH_SLAVE_TIMEOUT:
+      sendStatus = SEND_FLUSH_SLAVE_TIMEOUT;
+      break;
+    case SLAVE_NOT_AVAILABLE:
+      sendStatus = SEND_SLAVE_NOT_AVAILABLE;
+      break;
+    case SUCCESS_VALUE:
+      sendStatus = SEND_OK;
+      break;
+    default:
+      res = -1;
+      break;
+  }
+  if (res == 0) {
+    SendMessageResponseHeader* responseHeader =
+        (SendMessageResponseHeader*)pResponse->getCommandHeader();
+    MQMessageQueue messageQueue(msg.getTopic(), brokerName,
+                                responseHeader->queueId);
+    return SendResult(sendStatus, responseHeader->msgId, messageQueue,
+                      responseHeader->queueOffset);
+  }
+  LOG_ERROR("processSendResponse error remark:%s, error code:%d",
+            (pResponse->getRemark()).c_str(), pResponse->getCode());
+  THROW_MQEXCEPTION(MQClientException, pResponse->getRemark(),
+                    pResponse->getCode());
+}
+
+PullResult* MQClientAPIImpl::processPullResponse(RemotingCommand* pResponse) {
+  PullStatus pullStatus = NO_NEW_MSG;
+  switch (pResponse->getCode()) {
+    case SUCCESS_VALUE:
+      pullStatus = FOUND;
+      break;
+    case PULL_NOT_FOUND:
+      pullStatus = NO_NEW_MSG;
+      break;
+    case PULL_RETRY_IMMEDIATELY:
+      pullStatus = NO_MATCHED_MSG;
+      break;
+    case PULL_OFFSET_MOVED:
+      pullStatus = OFFSET_ILLEGAL;
+      break;
+    default:
+      THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(),
+                        pResponse->getCode());
+      break;
+  }
+
+  PullMessageResponseHeader* responseHeader =
+      static_cast<PullMessageResponseHeader*>(pResponse->getCommandHeader());
+
+  if (!responseHeader) {
+    LOG_ERROR("processPullResponse:responseHeader is NULL");
+    THROW_MQEXCEPTION(MQClientException,
+                      "processPullResponse:responseHeader is NULL", -1);
+  }
+  //<!get body,delete outsite;
+  MemoryBlock bodyFromResponse =
+      *(pResponse->GetBody());  // response data judgement had been done outside
+                                // of processPullResponse
+  if (bodyFromResponse.getSize() == 0) {
+    if (pullStatus != FOUND) {
+      return new PullResultExt(pullStatus, responseHeader->nextBeginOffset,
+                               responseHeader->minOffset,
+                               responseHeader->maxOffset,
+                               (int)responseHeader->suggestWhichBrokerId);
+    } else {
+      THROW_MQEXCEPTION(MQClientException,
+                        "memoryBody size is 0, but pullStatus equals found",
+                        -1);
+    }
+  } else {
+    return new PullResultExt(
+        pullStatus, responseHeader->nextBeginOffset, responseHeader->minOffset,
+        responseHeader->maxOffset, (int)responseHeader->suggestWhichBrokerId,
+        bodyFromResponse);
+  }
+}
+
+//<!***************************************************************************
+int64 MQClientAPIImpl::getMinOffset(
+    const string& addr, const string& topic, int queueId, int timeoutMillis,
+    const SessionCredentials& sessionCredentials) {
+  GetMinOffsetRequestHeader* pRequestHeader = new GetMinOffsetRequestHeader();
+  pRequestHeader->topic = topic;
+  pRequestHeader->queueId = queueId;
+
+  RemotingCommand request(GET_MIN_OFFSET, pRequestHeader);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  unique_ptr<RemotingCommand> response(
+      m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+  if (response) {
+    switch (response->getCode()) {
+      case SUCCESS_VALUE: {
+        GetMinOffsetResponseHeader* responseHeader =
+            (GetMinOffsetResponseHeader*)response->getCommandHeader();
+
+        int64 offset = responseHeader->offset;
+        return offset;
+      }
+      default:
+        break;
+    }
+    THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+                      response->getCode());
+  }
+  THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+int64 MQClientAPIImpl::getMaxOffset(
+    const string& addr, const string& topic, int queueId, int timeoutMillis,
+    const SessionCredentials& sessionCredentials) {
+  GetMaxOffsetRequestHeader* pRequestHeader = new GetMaxOffsetRequestHeader();
+  pRequestHeader->topic = topic;
+  pRequestHeader->queueId = queueId;
+
+  RemotingCommand request(GET_MAX_OFFSET, pRequestHeader);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  unique_ptr<RemotingCommand> response(
+      m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+  if (response) {
+    switch (response->getCode()) {
+      case SUCCESS_VALUE: {
+        GetMaxOffsetResponseHeader* responseHeader =
+            (GetMaxOffsetResponseHeader*)response->getCommandHeader();
+
+        int64 offset = responseHeader->offset;
+        return offset;
+      }
+      default:
+        break;
+    }
+    THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+                      response->getCode());
+  }
+  THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+int64 MQClientAPIImpl::searchOffset(
+    const string& addr, const string& topic, int queueId, uint64_t timestamp,
+    int timeoutMillis, const SessionCredentials& sessionCredentials) {
+  SearchOffsetRequestHeader* pRequestHeader = new SearchOffsetRequestHeader();
+  pRequestHeader->topic = topic;
+  pRequestHeader->queueId = queueId;
+  pRequestHeader->timestamp = timestamp;
+
+  RemotingCommand request(SEARCH_OFFSET_BY_TIMESTAMP, pRequestHeader);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  unique_ptr<RemotingCommand> response(
+      m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+  if (response) {
+    switch (response->getCode()) {
+      case SUCCESS_VALUE: {
+        SearchOffsetResponseHeader* responseHeader =
+            (SearchOffsetResponseHeader*)response->getCommandHeader();
+
+        int64 offset = responseHeader->offset;
+        return offset;
+      }
+      default:
+        break;
+    }
+    THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+                      response->getCode());
+  }
+  THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+MQMessageExt* MQClientAPIImpl::viewMessage(
+    const string& addr, int64 phyoffset, int timeoutMillis,
+    const SessionCredentials& sessionCredentials) {
+  ViewMessageRequestHeader* pRequestHeader = new ViewMessageRequestHeader();
+  pRequestHeader->offset = phyoffset;
+
+  RemotingCommand request(VIEW_MESSAGE_BY_ID, pRequestHeader);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  unique_ptr<RemotingCommand> response(
+      m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+  if (response) {
+    switch (response->getCode()) {
+      case SUCCESS_VALUE: {
+      }
+      default:
+        break;
+    }
+    THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+                      response->getCode());
+  }
+  THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+int64 MQClientAPIImpl::getEarliestMsgStoretime(
+    const string& addr, const string& topic, int queueId, int timeoutMillis,
+    const SessionCredentials& sessionCredentials) {
+  GetEarliestMsgStoretimeRequestHeader* pRequestHeader =
+      new GetEarliestMsgStoretimeRequestHeader();
+  pRequestHeader->topic = topic;
+  pRequestHeader->queueId = queueId;
+
+  RemotingCommand request(GET_EARLIEST_MSG_STORETIME, pRequestHeader);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  unique_ptr<RemotingCommand> response(
+      m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+  if (response) {
+    switch (response->getCode()) {
+      case SUCCESS_VALUE: {
+        GetEarliestMsgStoretimeResponseHeader* responseHeader =
+            (GetEarliestMsgStoretimeResponseHeader*)
+                response->getCommandHeader();
+
+        int64 timestamp = responseHeader->timestamp;
+        return timestamp;
+      }
+      default:
+        break;
+    }
+    THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+                      response->getCode());
+  }
+  THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+void MQClientAPIImpl::getConsumerIdListByGroup(
+    const string& addr, const string& consumerGroup, vector<string>& cids,
+    int timeoutMillis, const SessionCredentials& sessionCredentials) {
+  GetConsumerListByGroupRequestHeader* pRequestHeader =
+      new GetConsumerListByGroupRequestHeader();
+  pRequestHeader->consumerGroup = consumerGroup;
+
+  RemotingCommand request(GET_CONSUMER_LIST_BY_GROUP, pRequestHeader);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  unique_ptr<RemotingCommand> pResponse(
+      m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+  if (pResponse != NULL) {
+    if ((pResponse->GetBody()->getSize() == 0) ||
+        (pResponse->GetBody()->getData() != NULL)) {
+      switch (pResponse->getCode()) {
+        case SUCCESS_VALUE: {
+          const MemoryBlock* pbody = pResponse->GetBody();
+          if (pbody->getSize()) {
+            GetConsumerListByGroupResponseBody::Decode(pbody, cids);
+            return;
+          }
+        }
+        default:
+          break;
+      }
+      THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(),
+                        pResponse->getCode());
+    }
+  }
+  THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+int64 MQClientAPIImpl::queryConsumerOffset(
+    const string& addr, QueryConsumerOffsetRequestHeader* pRequestHeader,
+    int timeoutMillis, const SessionCredentials& sessionCredentials) {
+  RemotingCommand request(QUERY_CONSUMER_OFFSET, pRequestHeader);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  unique_ptr<RemotingCommand> response(
+      m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+  if (response) {
+    switch (response->getCode()) {
+      case SUCCESS_VALUE: {
+        QueryConsumerOffsetResponseHeader* responseHeader =
+            (QueryConsumerOffsetResponseHeader*)response->getCommandHeader();
+        int64 consumerOffset = responseHeader->offset;
+        return consumerOffset;
+      }
+      default:
+        break;
+    }
+    THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+                      response->getCode());
+  }
+  THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+  return -1;
+}
+
+void MQClientAPIImpl::updateConsumerOffset(
+    const string& addr, UpdateConsumerOffsetRequestHeader* pRequestHeader,
+    int timeoutMillis, const SessionCredentials& sessionCredentials) {
+  RemotingCommand request(UPDATE_CONSUMER_OFFSET, pRequestHeader);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  unique_ptr<RemotingCommand> response(
+      m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+  if (response) {
+    switch (response->getCode()) {
+      case SUCCESS_VALUE: {
+        return;
+      }
+      default:
+        break;
+    }
+    THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+                      response->getCode());
+  }
+  THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+void MQClientAPIImpl::updateConsumerOffsetOneway(
+    const string& addr, UpdateConsumerOffsetRequestHeader* pRequestHeader,
+    int timeoutMillis, const SessionCredentials& sessionCredentials) {
+  RemotingCommand request(UPDATE_CONSUMER_OFFSET, pRequestHeader);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  m_pRemotingClient->invokeOneway(addr, request);
+}
+
+void MQClientAPIImpl::consumerSendMessageBack(
+    MQMessageExt& msg, const string& consumerGroup, int delayLevel,
+    int timeoutMillis, const SessionCredentials& sessionCredentials) {
+  ConsumerSendMsgBackRequestHeader* pRequestHeader =
+      new ConsumerSendMsgBackRequestHeader();
+  pRequestHeader->group = consumerGroup;
+  pRequestHeader->offset = msg.getCommitLogOffset();
+  pRequestHeader->delayLevel = delayLevel;
+
+  string addr = socketAddress2IPPort(msg.getStoreHost());
+  RemotingCommand request(CONSUMER_SEND_MSG_BACK, pRequestHeader);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  unique_ptr<RemotingCommand> response(
+      m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+  if (response) {
+    switch (response->getCode()) {
+      case SUCCESS_VALUE: {
+        return;
+      }
+      default:
+        break;
+    }
+    THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+                      response->getCode());
+  }
+  THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+void MQClientAPIImpl::lockBatchMQ(
+    const string& addr, LockBatchRequestBody* requestBody,
+    vector<MQMessageQueue>& mqs, int timeoutMillis,
+    const SessionCredentials& sessionCredentials) {
+  RemotingCommand request(LOCK_BATCH_MQ, NULL);
+  string body;
+  requestBody->Encode(body);
+  request.SetBody(body.data(), body.length());
+  request.setMsgBody(body);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  unique_ptr<RemotingCommand> pResponse(
+      m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+  if (pResponse != NULL) {
+    if (((*(pResponse->GetBody())).getSize() == 0) ||
+        ((*(pResponse->GetBody())).getData() != NULL)) {
+      switch (pResponse->getCode()) {
+        case SUCCESS_VALUE: {
+          const MemoryBlock* pbody = pResponse->GetBody();
+          if (pbody->getSize()) {
+            LockBatchResponseBody::Decode(pbody, mqs);
+          }
+          return;
+        } break;
+        default:
+          break;
+      }
+      THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(),
+                        pResponse->getCode());
+    }
+  }
+  THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+void MQClientAPIImpl::unlockBatchMQ(
+    const string& addr, UnlockBatchRequestBody* requestBody, int timeoutMillis,
+    const SessionCredentials& sessionCredentials) {
+  RemotingCommand request(UNLOCK_BATCH_MQ, NULL);
+  string body;
+  requestBody->Encode(body);
+  request.SetBody(body.data(), body.length());
+  request.setMsgBody(body);
+  callSignatureBeforeRequest(addr, request, sessionCredentials);
+  request.Encode();
+
+  unique_ptr<RemotingCommand> pResponse(
+      m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+  if (pResponse != NULL) {
+    switch (pResponse->getCode()) {
+      case SUCCESS_VALUE: {
+        return;
+      } break;
+      default:
+        break;
+    }
+    THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(),
+                      pResponse->getCode());
+  }
+  THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+//<!************************************************************************
+}  //<!end namespace;

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/MQClientAPIImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/MQClientAPIImpl.h b/rocketmq-cpp/src/MQClientAPIImpl.h
new file mode 100644
index 0000000..31e61a0
--- /dev/null
+++ b/rocketmq-cpp/src/MQClientAPIImpl.h
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MQCLIENTAPIIMPL_H__
+#define __MQCLIENTAPIIMPL_H__
+#include "AsyncCallback.h"
+#include "ClientRPCHook.h"
+#include "ClientRemotingProcessor.h"
+#include "CommandHeader.h"
+#include "HeartbeatData.h"
+#include "KVTable.h"
+#include "LockBatchBody.h"
+#include "MQClientException.h"
+#include "MQMessageExt.h"
+#include "MQProtos.h"
+#include "SendResult.h"
+#include "SocketUtil.h"
+#include "TcpRemotingClient.h"
+#include "TopAddressing.h"
+#include "TopicConfig.h"
+#include "TopicList.h"
+#include "TopicRouteData.h"
+#include "UtilAll.h"
+#include "VirtualEnvUtil.h"
+
+namespace rocketmq {
+//<!wrap all API to net ;
+//<!************************************************************************
+class MQClientAPIImpl {
+ public:
+  MQClientAPIImpl(const string& mqClientId, ClientRemotingProcessor* clientRemotingProcessor,
+                  int pullThreadNum, uint64_t tcpConnectTimeout,
+                  uint64_t tcpTransportTryLockTimeout, string unitName);
+  virtual ~MQClientAPIImpl();
+  void stopAllTcpTransportThread();
+  bool writeDataToFile(string filename, string data, bool isSync);
+  string fetchNameServerAddr(const string& NSDomain);
+  void updateNameServerAddr(const string& addrs);
+
+  void callSignatureBeforeRequest(
+      const string& addr, RemotingCommand& request,
+      const SessionCredentials& session_credentials);
+  void createTopic(const string& addr, const string& defaultTopic,
+                   TopicConfig topicConfig,
+                   const SessionCredentials& sessionCredentials);
+
+  SendResult sendMessage(const string& addr, const string& brokerName,
+                         const MQMessage& msg,
+                         SendMessageRequestHeader* pRequestHeader,
+                         int timeoutMillis, int communicationMode,
+                         SendCallback* pSendCallback,
+                         const SessionCredentials& sessionCredentials);
+
+  PullResult* pullMessage(const string& addr,
+                          PullMessageRequestHeader* pRequestHeader,
+                          int timeoutMillis, int communicationMode,
+                          PullCallback* pullCallback, void* pArg,
+                          const SessionCredentials& sessionCredentials);
+
+  void sendHearbeat(const string& addr, HeartbeatData* pHeartbeatData,
+                    const SessionCredentials& sessionCredentials);
+
+  void unregisterClient(const string& addr, const string& clientID,
+                        const string& producerGroup,
+                        const string& consumerGroup,
+                        const SessionCredentials& sessionCredentials);
+
+  TopicRouteData* getTopicRouteInfoFromNameServer(
+      const string& topic, int timeoutMillis,
+      const SessionCredentials& sessionCredentials);
+
+  TopicList* getTopicListFromNameServer(
+      const SessionCredentials& sessionCredentials);
+
+  int wipeWritePermOfBroker(const string& namesrvAddr, const string& brokerName,
+                            int timeoutMillis);
+
+  void deleteTopicInBroker(const string& addr, const string& topic,
+                           int timeoutMillis);
+
+  void deleteTopicInNameServer(const string& addr, const string& topic,
+                               int timeoutMillis);
+
+  void deleteSubscriptionGroup(const string& addr, const string& groupName,
+                               int timeoutMillis);
+
+  string getKVConfigByValue(const string& projectNamespace,
+                            const string& projectGroup, int timeoutMillis);
+
+  KVTable getKVListByNamespace(const string& projectNamespace,
+                               int timeoutMillis);
+
+  void deleteKVConfigByValue(const string& projectNamespace,
+                             const string& projectGroup, int timeoutMillis);
+
+  SendResult processSendResponse(const string& brokerName, const MQMessage& msg,
+                                 RemotingCommand* pResponse);
+
+  PullResult* processPullResponse(RemotingCommand* pResponse);
+
+  int64 getMinOffset(const string& addr, const string& topic, int queueId,
+                     int timeoutMillis,
+                     const SessionCredentials& sessionCredentials);
+
+  int64 getMaxOffset(const string& addr, const string& topic, int queueId,
+                     int timeoutMillis,
+                     const SessionCredentials& sessionCredentials);
+
+  int64 searchOffset(const string& addr, const string& topic, int queueId,
+                     uint64_t timestamp, int timeoutMillis,
+                     const SessionCredentials& sessionCredentials);
+
+  MQMessageExt* viewMessage(const string& addr, int64 phyoffset,
+                            int timeoutMillis,
+                            const SessionCredentials& sessionCredentials);
+
+  int64 getEarliestMsgStoretime(const string& addr, const string& topic,
+                                int queueId, int timeoutMillis,
+                                const SessionCredentials& sessionCredentials);
+
+  void getConsumerIdListByGroup(const string& addr, const string& consumerGroup,
+                                vector<string>& cids, int timeoutMillis,
+                                const SessionCredentials& sessionCredentials);
+
+  int64 queryConsumerOffset(const string& addr,
+                            QueryConsumerOffsetRequestHeader* pRequestHeader,
+                            int timeoutMillis,
+                            const SessionCredentials& sessionCredentials);
+
+  void updateConsumerOffset(const string& addr,
+                            UpdateConsumerOffsetRequestHeader* pRequestHeader,
+                            int timeoutMillis,
+                            const SessionCredentials& sessionCredentials);
+
+  void updateConsumerOffsetOneway(
+      const string& addr, UpdateConsumerOffsetRequestHeader* pRequestHeader,
+      int timeoutMillis, const SessionCredentials& sessionCredentials);
+
+  void consumerSendMessageBack(MQMessageExt& msg, const string& consumerGroup,
+                               int delayLevel, int timeoutMillis,
+                               const SessionCredentials& sessionCredentials);
+
+  void lockBatchMQ(const string& addr, LockBatchRequestBody* requestBody,
+                   vector<MQMessageQueue>& mqs, int timeoutMillis,
+                   const SessionCredentials& sessionCredentials);
+
+  void unlockBatchMQ(const string& addr, UnlockBatchRequestBody* requestBody,
+                     int timeoutMillis,
+                     const SessionCredentials& sessionCredentials);
+
+ private:
+  SendResult sendMessageSync(const string& addr, const string& brokerName,
+                             const MQMessage& msg, RemotingCommand& request,
+                             int timeoutMillis);
+
+  void sendMessageAsync(const string& addr, const string& brokerName,
+                        const MQMessage& msg, RemotingCommand& request,
+                        SendCallback* pSendCallback, int64 timeoutMilliseconds);
+
+  PullResult* pullMessageSync(const string& addr, RemotingCommand& request,
+                              int timeoutMillis);
+
+  void pullMessageAsync(const string& addr, RemotingCommand& request,
+                        int timeoutMillis, PullCallback* pullCallback,
+                        void* pArg);
+
+ private:
+  unique_ptr<TcpRemotingClient> m_pRemotingClient;
+  unique_ptr<TopAddressing> m_topAddressing;
+  string m_nameSrvAddr;
+  bool m_firstFetchNameSrv;
+  string m_mqClientId;
+};
+}  //<!end namespace;
+//<!***************************************************************************
+#endif