You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2017/09/04 05:04:57 UTC
[11/14] incubator-rocketmq-externals git commit: upload rocketmq-cpp
code
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/libs/signature/src/sha512.c
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/libs/signature/src/sha512.c b/rocketmq-cpp/libs/signature/src/sha512.c
new file mode 100755
index 0000000..c64aec6
--- /dev/null
+++ b/rocketmq-cpp/libs/signature/src/sha512.c
@@ -0,0 +1,616 @@
+/* sha512.c - Functions to compute SHA512 and SHA384 message digest of files or
+ memory blocks according to the NIST specification FIPS-180-2.
+
+ Copyright (C) 2005, 2006, 2008 Free Software Foundation, Inc.
+
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>. */
+
+/* Written by David Madore, considerably copypasting from
+ Scott G. Miller's sha1.c
+*/
+
+/* #include <config.h> */
+
+#include "sha512.h"
+
+#include <stddef.h>
+#include <string.h>
+
+#if USE_UNLOCKED_IO
+# include "unlocked-io.h"
+#endif
+
+#ifdef __cplusplus
+namespace metaqSignature{
+#endif
+
+#ifdef WORDS_BIGENDIAN
+# define SWAP(n) (n)
+#else
+# define SWAP(n) \
+ u64or (u64or (u64or (u64shl (n, 56), \
+ u64shl (u64and (n, u64lo (0x0000ff00)), 40)), \
+ u64or (u64shl (u64and (n, u64lo (0x00ff0000)), 24), \
+ u64shl (u64and (n, u64lo (0xff000000)), 8))), \
+ u64or (u64or (u64and (u64shr (n, 8), u64lo (0xff000000)), \
+ u64and (u64shr (n, 24), u64lo (0x00ff0000))), \
+ u64or (u64and (u64shr (n, 40), u64lo (0x0000ff00)), \
+ u64shr (n, 56))))
+#endif
+
+#define BLOCKSIZE 4096
+#if BLOCKSIZE % 128 != 0
+# error "invalid BLOCKSIZE"
+#endif
+
+/* This array contains the bytes used to pad the buffer to the next
+ 128-byte boundary. */
+static const unsigned char fillbuf[128] = { 0x80, 0 /* , 0, 0, ... */ };
+
+
+/*
+ Takes a pointer to a 512 bit block of data (eight 64 bit ints) and
+ intializes it to the start constants of the SHA512 algorithm. This
+ must be called before using hash in the call to sha512_hash
+*/
+void
+sha512_init_ctx (struct sha512_ctx *ctx)
+{
+ ctx->state[0] = u64hilo (0x6a09e667, 0xf3bcc908);
+ ctx->state[1] = u64hilo (0xbb67ae85, 0x84caa73b);
+ ctx->state[2] = u64hilo (0x3c6ef372, 0xfe94f82b);
+ ctx->state[3] = u64hilo (0xa54ff53a, 0x5f1d36f1);
+ ctx->state[4] = u64hilo (0x510e527f, 0xade682d1);
+ ctx->state[5] = u64hilo (0x9b05688c, 0x2b3e6c1f);
+ ctx->state[6] = u64hilo (0x1f83d9ab, 0xfb41bd6b);
+ ctx->state[7] = u64hilo (0x5be0cd19, 0x137e2179);
+
+ ctx->total[0] = ctx->total[1] = u64lo (0);
+ ctx->buflen = 0;
+}
+
+void
+sha384_init_ctx (struct sha512_ctx *ctx)
+{
+ ctx->state[0] = u64hilo (0xcbbb9d5d, 0xc1059ed8);
+ ctx->state[1] = u64hilo (0x629a292a, 0x367cd507);
+ ctx->state[2] = u64hilo (0x9159015a, 0x3070dd17);
+ ctx->state[3] = u64hilo (0x152fecd8, 0xf70e5939);
+ ctx->state[4] = u64hilo (0x67332667, 0xffc00b31);
+ ctx->state[5] = u64hilo (0x8eb44a87, 0x68581511);
+ ctx->state[6] = u64hilo (0xdb0c2e0d, 0x64f98fa7);
+ ctx->state[7] = u64hilo (0x47b5481d, 0xbefa4fa4);
+
+ ctx->total[0] = ctx->total[1] = u64lo (0);
+ ctx->buflen = 0;
+}
+
+/* Copy the value from V into the memory location pointed to by *CP,
+ If your architecture allows unaligned access, this is equivalent to
+ * (__typeof__ (v) *) cp = v */
+#ifdef WIN32
+static _inline void
+#else
+static __inline__ void
+#endif
+set_uint64 (char *cp, u64 v)
+{
+ memcpy (cp, &v, sizeof v);
+}
+
+/* Put result from CTX in first 64 bytes following RESBUF.
+ The result must be in little endian byte order. */
+void *
+sha512_read_ctx (const struct sha512_ctx *ctx, void *resbuf)
+{
+ int i;
+ char *r = (char*)resbuf;
+
+ for (i = 0; i < 8; i++)
+ set_uint64 (r + i * sizeof ctx->state[0], SWAP (ctx->state[i]));
+
+ return resbuf;
+}
+
+void *
+sha384_read_ctx (const struct sha512_ctx *ctx, void *resbuf)
+{
+ int i;
+ char *r = (char*)resbuf;
+
+ for (i = 0; i < 6; i++)
+ set_uint64 (r + i * sizeof ctx->state[0], SWAP (ctx->state[i]));
+
+ return resbuf;
+}
+
+/* Process the remaining bytes in the internal buffer and the usual
+ prolog according to the standard and write the result to RESBUF. */
+static void
+sha512_conclude_ctx (struct sha512_ctx *ctx)
+{
+ /* Take yet unprocessed bytes into account. */
+ size_t bytes = ctx->buflen;
+ size_t size = (bytes < 112) ? 128 / 8 : 128 * 2 / 8;
+
+ /* Now count remaining bytes. */
+ ctx->total[0] = u64plus (ctx->total[0], u64lo (bytes));
+ if (u64lt (ctx->total[0], u64lo (bytes)))
+ ctx->total[1] = u64plus (ctx->total[1], u64lo (1));
+
+ /* Put the 128-bit file length in *bits* at the end of the buffer.
+ Use set_uint64 rather than a simple assignment, to avoid risk of
+ unaligned access. */
+ set_uint64 ((char *) &ctx->buffer[size - 2],
+ SWAP (u64or (u64shl (ctx->total[1], 3),
+ u64shr (ctx->total[0], 61))));
+ set_uint64 ((char *) &ctx->buffer[size - 1],
+ SWAP (u64shl (ctx->total[0], 3)));
+
+ memcpy (&((char *) ctx->buffer)[bytes], fillbuf, (size - 2) * 8 - bytes);
+
+ /* Process last bytes. */
+ sha512_process_block (ctx->buffer, size * 8, ctx);
+}
+
+void *
+sha512_finish_ctx (struct sha512_ctx *ctx, void *resbuf)
+{
+ sha512_conclude_ctx (ctx);
+ return sha512_read_ctx (ctx, resbuf);
+}
+
+void *
+sha384_finish_ctx (struct sha512_ctx *ctx, void *resbuf)
+{
+ sha512_conclude_ctx (ctx);
+ return sha384_read_ctx (ctx, resbuf);
+}
+
+/* Compute SHA512 message digest for bytes read from STREAM. The
+ resulting message digest number will be written into the 64 bytes
+ beginning at RESBLOCK. */
+int
+sha512_stream (FILE *stream, void *resblock)
+{
+ struct sha512_ctx ctx;
+ char buffer[BLOCKSIZE + 72];
+ size_t sum;
+
+ /* Initialize the computation context. */
+ sha512_init_ctx (&ctx);
+
+ /* Iterate over full file contents. */
+ while (1)
+ {
+ /* We read the file in blocks of BLOCKSIZE bytes. One call of the
+ computation function processes the whole buffer so that with the
+ next round of the loop another block can be read. */
+ size_t n;
+ sum = 0;
+
+ /* Read block. Take care for partial reads. */
+ while (1)
+ {
+ n = fread (buffer + sum, 1, BLOCKSIZE - sum, stream);
+
+ sum += n;
+
+ if (sum == BLOCKSIZE)
+ break;
+
+ if (n == 0)
+ {
+ /* Check for the error flag IFF N == 0, so that we don't
+ exit the loop after a partial read due to e.g., EAGAIN
+ or EWOULDBLOCK. */
+ if (ferror (stream))
+ return 1;
+ goto process_partial_block;
+ }
+
+ /* We've read at least one byte, so ignore errors. But always
+ check for EOF, since feof may be true even though N > 0.
+ Otherwise, we could end up calling fread after EOF. */
+ if (feof (stream))
+ goto process_partial_block;
+ }
+
+ /* Process buffer with BLOCKSIZE bytes. Note that
+ BLOCKSIZE % 128 == 0
+ */
+ sha512_process_block (buffer, BLOCKSIZE, &ctx);
+ }
+
+ process_partial_block:;
+
+ /* Process any remaining bytes. */
+ if (sum > 0)
+ sha512_process_bytes (buffer, sum, &ctx);
+
+ /* Construct result in desired memory. */
+ sha512_finish_ctx (&ctx, resblock);
+ return 0;
+}
+
+/* FIXME: Avoid code duplication */
+int
+sha384_stream (FILE *stream, void *resblock)
+{
+ struct sha512_ctx ctx;
+ char buffer[BLOCKSIZE + 72];
+ size_t sum;
+
+ /* Initialize the computation context. */
+ sha384_init_ctx (&ctx);
+
+ /* Iterate over full file contents. */
+ while (1)
+ {
+ /* We read the file in blocks of BLOCKSIZE bytes. One call of the
+ computation function processes the whole buffer so that with the
+ next round of the loop another block can be read. */
+ size_t n;
+ sum = 0;
+
+ /* Read block. Take care for partial reads. */
+ while (1)
+ {
+ n = fread (buffer + sum, 1, BLOCKSIZE - sum, stream);
+
+ sum += n;
+
+ if (sum == BLOCKSIZE)
+ break;
+
+ if (n == 0)
+ {
+ /* Check for the error flag IFF N == 0, so that we don't
+ exit the loop after a partial read due to e.g., EAGAIN
+ or EWOULDBLOCK. */
+ if (ferror (stream))
+ return 1;
+ goto process_partial_block;
+ }
+
+ /* We've read at least one byte, so ignore errors. But always
+ check for EOF, since feof may be true even though N > 0.
+ Otherwise, we could end up calling fread after EOF. */
+ if (feof (stream))
+ goto process_partial_block;
+ }
+
+ /* Process buffer with BLOCKSIZE bytes. Note that
+ BLOCKSIZE % 128 == 0
+ */
+ sha512_process_block (buffer, BLOCKSIZE, &ctx);
+ }
+
+ process_partial_block:;
+
+ /* Process any remaining bytes. */
+ if (sum > 0)
+ sha512_process_bytes (buffer, sum, &ctx);
+
+ /* Construct result in desired memory. */
+ sha384_finish_ctx (&ctx, resblock);
+ return 0;
+}
+
+/* Compute SHA512 message digest for LEN bytes beginning at BUFFER. The
+ result is always in little endian byte order, so that a byte-wise
+ output yields to the wanted ASCII representation of the message
+ digest. */
+void *
+sha512_buffer (const char *buffer, size_t len, void *resblock)
+{
+ struct sha512_ctx ctx;
+
+ /* Initialize the computation context. */
+ sha512_init_ctx (&ctx);
+
+ /* Process whole buffer but last len % 128 bytes. */
+ sha512_process_bytes (buffer, len, &ctx);
+
+ /* Put result in desired memory area. */
+ return sha512_finish_ctx (&ctx, resblock);
+}
+
+void *
+sha384_buffer (const char *buffer, size_t len, void *resblock)
+{
+ struct sha512_ctx ctx;
+
+ /* Initialize the computation context. */
+ sha384_init_ctx (&ctx);
+
+ /* Process whole buffer but last len % 128 bytes. */
+ sha512_process_bytes (buffer, len, &ctx);
+
+ /* Put result in desired memory area. */
+ return sha384_finish_ctx (&ctx, resblock);
+}
+
+void
+sha512_process_bytes (const void *buffer, size_t len, struct sha512_ctx *ctx)
+{
+ /* When we already have some bits in our internal buffer concatenate
+ both inputs first. */
+ if (ctx->buflen != 0)
+ {
+ size_t left_over = ctx->buflen;
+ size_t add = 256 - left_over > len ? len : 256 - left_over;
+
+ memcpy (&((char *) ctx->buffer)[left_over], buffer, add);
+ ctx->buflen += add;
+
+ if (ctx->buflen > 128)
+ {
+ sha512_process_block (ctx->buffer, ctx->buflen & ~127, ctx);
+
+ ctx->buflen &= 127;
+ /* The regions in the following copy operation cannot overlap. */
+ memcpy (ctx->buffer,
+ &((char *) ctx->buffer)[(left_over + add) & ~127],
+ ctx->buflen);
+ }
+
+ buffer = (const char *) buffer + add;
+ len -= add;
+ }
+
+ /* Process available complete blocks. */
+ if (len >= 128)
+ {
+#if !_STRING_ARCH_unaligned
+# define alignof(type) offsetof (struct { char c; type x; }, x)
+# define UNALIGNED_P(p) (((size_t) p) % alignof (u64) != 0)
+ if (UNALIGNED_P (buffer))
+ while (len > 128)
+ {
+ sha512_process_block (memcpy (ctx->buffer, buffer, 128), 128, ctx);
+ buffer = (const char *) buffer + 128;
+ len -= 128;
+ }
+ else
+#endif
+ {
+ sha512_process_block (buffer, len & ~127, ctx);
+ buffer = (const char *) buffer + (len & ~127);
+ len &= 127;
+ }
+ }
+
+ /* Move remaining bytes in internal buffer. */
+ if (len > 0)
+ {
+ size_t left_over = ctx->buflen;
+
+ memcpy (&((char *) ctx->buffer)[left_over], buffer, len);
+ left_over += len;
+ if (left_over >= 128)
+ {
+ sha512_process_block (ctx->buffer, 128, ctx);
+ left_over -= 128;
+ memcpy (ctx->buffer, &ctx->buffer[16], left_over);
+ }
+ ctx->buflen = left_over;
+ }
+}
+
+/* --- Code below is the primary difference between sha1.c and sha512.c --- */
+
+/* SHA512 round constants */
+#define K(I) sha512_round_constants[I]
+static u64 const sha512_round_constants[80] = {
+ u64init (0x428a2f98, 0xd728ae22), u64init (0x71374491, 0x23ef65cd),
+ u64init (0xb5c0fbcf, 0xec4d3b2f), u64init (0xe9b5dba5, 0x8189dbbc),
+ u64init (0x3956c25b, 0xf348b538), u64init (0x59f111f1, 0xb605d019),
+ u64init (0x923f82a4, 0xaf194f9b), u64init (0xab1c5ed5, 0xda6d8118),
+ u64init (0xd807aa98, 0xa3030242), u64init (0x12835b01, 0x45706fbe),
+ u64init (0x243185be, 0x4ee4b28c), u64init (0x550c7dc3, 0xd5ffb4e2),
+ u64init (0x72be5d74, 0xf27b896f), u64init (0x80deb1fe, 0x3b1696b1),
+ u64init (0x9bdc06a7, 0x25c71235), u64init (0xc19bf174, 0xcf692694),
+ u64init (0xe49b69c1, 0x9ef14ad2), u64init (0xefbe4786, 0x384f25e3),
+ u64init (0x0fc19dc6, 0x8b8cd5b5), u64init (0x240ca1cc, 0x77ac9c65),
+ u64init (0x2de92c6f, 0x592b0275), u64init (0x4a7484aa, 0x6ea6e483),
+ u64init (0x5cb0a9dc, 0xbd41fbd4), u64init (0x76f988da, 0x831153b5),
+ u64init (0x983e5152, 0xee66dfab), u64init (0xa831c66d, 0x2db43210),
+ u64init (0xb00327c8, 0x98fb213f), u64init (0xbf597fc7, 0xbeef0ee4),
+ u64init (0xc6e00bf3, 0x3da88fc2), u64init (0xd5a79147, 0x930aa725),
+ u64init (0x06ca6351, 0xe003826f), u64init (0x14292967, 0x0a0e6e70),
+ u64init (0x27b70a85, 0x46d22ffc), u64init (0x2e1b2138, 0x5c26c926),
+ u64init (0x4d2c6dfc, 0x5ac42aed), u64init (0x53380d13, 0x9d95b3df),
+ u64init (0x650a7354, 0x8baf63de), u64init (0x766a0abb, 0x3c77b2a8),
+ u64init (0x81c2c92e, 0x47edaee6), u64init (0x92722c85, 0x1482353b),
+ u64init (0xa2bfe8a1, 0x4cf10364), u64init (0xa81a664b, 0xbc423001),
+ u64init (0xc24b8b70, 0xd0f89791), u64init (0xc76c51a3, 0x0654be30),
+ u64init (0xd192e819, 0xd6ef5218), u64init (0xd6990624, 0x5565a910),
+ u64init (0xf40e3585, 0x5771202a), u64init (0x106aa070, 0x32bbd1b8),
+ u64init (0x19a4c116, 0xb8d2d0c8), u64init (0x1e376c08, 0x5141ab53),
+ u64init (0x2748774c, 0xdf8eeb99), u64init (0x34b0bcb5, 0xe19b48a8),
+ u64init (0x391c0cb3, 0xc5c95a63), u64init (0x4ed8aa4a, 0xe3418acb),
+ u64init (0x5b9cca4f, 0x7763e373), u64init (0x682e6ff3, 0xd6b2b8a3),
+ u64init (0x748f82ee, 0x5defb2fc), u64init (0x78a5636f, 0x43172f60),
+ u64init (0x84c87814, 0xa1f0ab72), u64init (0x8cc70208, 0x1a6439ec),
+ u64init (0x90befffa, 0x23631e28), u64init (0xa4506ceb, 0xde82bde9),
+ u64init (0xbef9a3f7, 0xb2c67915), u64init (0xc67178f2, 0xe372532b),
+ u64init (0xca273ece, 0xea26619c), u64init (0xd186b8c7, 0x21c0c207),
+ u64init (0xeada7dd6, 0xcde0eb1e), u64init (0xf57d4f7f, 0xee6ed178),
+ u64init (0x06f067aa, 0x72176fba), u64init (0x0a637dc5, 0xa2c898a6),
+ u64init (0x113f9804, 0xbef90dae), u64init (0x1b710b35, 0x131c471b),
+ u64init (0x28db77f5, 0x23047d84), u64init (0x32caab7b, 0x40c72493),
+ u64init (0x3c9ebe0a, 0x15c9bebc), u64init (0x431d67c4, 0x9c100d4c),
+ u64init (0x4cc5d4be, 0xcb3e42b6), u64init (0x597f299c, 0xfc657e2a),
+ u64init (0x5fcb6fab, 0x3ad6faec), u64init (0x6c44198c, 0x4a475817),
+};
+
+/* Round functions. */
+#define F2(A, B, C) u64or (u64and (A, B), u64and (C, u64or (A, B)))
+#define F1(E, F, G) u64xor (G, u64and (E, u64xor (F, G)))
+
+/* Process LEN bytes of BUFFER, accumulating context into CTX.
+ It is assumed that LEN % 128 == 0.
+ Most of this code comes from GnuPG's cipher/sha1.c. */
+
+void
+sha512_process_block (const void *buffer, size_t len, struct sha512_ctx *ctx)
+{
+ u64 const *words = (u64 const *)buffer;
+ u64 const *endp = words + len / sizeof (u64);
+ u64 x[16];
+ u64 a = ctx->state[0];
+ u64 b = ctx->state[1];
+ u64 c = ctx->state[2];
+ u64 d = ctx->state[3];
+ u64 e = ctx->state[4];
+ u64 f = ctx->state[5];
+ u64 g = ctx->state[6];
+ u64 h = ctx->state[7];
+
+ /* First increment the byte count. FIPS PUB 180-2 specifies the possible
+ length of the file up to 2^128 bits. Here we only compute the
+ number of bytes. Do a double word increment. */
+ ctx->total[0] = u64plus (ctx->total[0], u64lo (len));
+ if (u64lt (ctx->total[0], u64lo (len)))
+ ctx->total[1] = u64plus (ctx->total[1], u64lo (1));
+
+#define S0(x) u64xor (u64rol(x, 63), u64xor (u64rol (x, 56), u64shr (x, 7)))
+#define S1(x) u64xor (u64rol (x, 45), u64xor (u64rol (x, 3), u64shr (x, 6)))
+#define SS0(x) u64xor (u64rol (x, 36), u64xor (u64rol (x, 30), u64rol (x, 25)))
+#define SS1(x) u64xor (u64rol(x, 50), u64xor (u64rol (x, 46), u64rol (x, 23)))
+
+#define M(I) (x[(I) & 15] \
+ = u64plus (x[(I) & 15], \
+ u64plus (S1 (x[((I) - 2) & 15]), \
+ u64plus (x[((I) - 7) & 15], \
+ S0 (x[((I) - 15) & 15])))))
+
+#define R(A, B, C, D, E, F, G, H, K, M) \
+ do \
+ { \
+ u64 t0 = u64plus (SS0 (A), F2 (A, B, C)); \
+ u64 t1 = \
+ u64plus (H, u64plus (SS1 (E), \
+ u64plus (F1 (E, F, G), u64plus (K, M)))); \
+ D = u64plus (D, t1); \
+ H = u64plus (t0, t1); \
+ } \
+ while (0)
+
+ while (words < endp)
+ {
+ int t;
+ /* FIXME: see sha1.c for a better implementation. */
+ for (t = 0; t < 16; t++)
+ {
+ x[t] = SWAP (*words);
+ words++;
+ }
+
+ R( a, b, c, d, e, f, g, h, K( 0), x[ 0] );
+ R( h, a, b, c, d, e, f, g, K( 1), x[ 1] );
+ R( g, h, a, b, c, d, e, f, K( 2), x[ 2] );
+ R( f, g, h, a, b, c, d, e, K( 3), x[ 3] );
+ R( e, f, g, h, a, b, c, d, K( 4), x[ 4] );
+ R( d, e, f, g, h, a, b, c, K( 5), x[ 5] );
+ R( c, d, e, f, g, h, a, b, K( 6), x[ 6] );
+ R( b, c, d, e, f, g, h, a, K( 7), x[ 7] );
+ R( a, b, c, d, e, f, g, h, K( 8), x[ 8] );
+ R( h, a, b, c, d, e, f, g, K( 9), x[ 9] );
+ R( g, h, a, b, c, d, e, f, K(10), x[10] );
+ R( f, g, h, a, b, c, d, e, K(11), x[11] );
+ R( e, f, g, h, a, b, c, d, K(12), x[12] );
+ R( d, e, f, g, h, a, b, c, K(13), x[13] );
+ R( c, d, e, f, g, h, a, b, K(14), x[14] );
+ R( b, c, d, e, f, g, h, a, K(15), x[15] );
+ R( a, b, c, d, e, f, g, h, K(16), M(16) );
+ R( h, a, b, c, d, e, f, g, K(17), M(17) );
+ R( g, h, a, b, c, d, e, f, K(18), M(18) );
+ R( f, g, h, a, b, c, d, e, K(19), M(19) );
+ R( e, f, g, h, a, b, c, d, K(20), M(20) );
+ R( d, e, f, g, h, a, b, c, K(21), M(21) );
+ R( c, d, e, f, g, h, a, b, K(22), M(22) );
+ R( b, c, d, e, f, g, h, a, K(23), M(23) );
+ R( a, b, c, d, e, f, g, h, K(24), M(24) );
+ R( h, a, b, c, d, e, f, g, K(25), M(25) );
+ R( g, h, a, b, c, d, e, f, K(26), M(26) );
+ R( f, g, h, a, b, c, d, e, K(27), M(27) );
+ R( e, f, g, h, a, b, c, d, K(28), M(28) );
+ R( d, e, f, g, h, a, b, c, K(29), M(29) );
+ R( c, d, e, f, g, h, a, b, K(30), M(30) );
+ R( b, c, d, e, f, g, h, a, K(31), M(31) );
+ R( a, b, c, d, e, f, g, h, K(32), M(32) );
+ R( h, a, b, c, d, e, f, g, K(33), M(33) );
+ R( g, h, a, b, c, d, e, f, K(34), M(34) );
+ R( f, g, h, a, b, c, d, e, K(35), M(35) );
+ R( e, f, g, h, a, b, c, d, K(36), M(36) );
+ R( d, e, f, g, h, a, b, c, K(37), M(37) );
+ R( c, d, e, f, g, h, a, b, K(38), M(38) );
+ R( b, c, d, e, f, g, h, a, K(39), M(39) );
+ R( a, b, c, d, e, f, g, h, K(40), M(40) );
+ R( h, a, b, c, d, e, f, g, K(41), M(41) );
+ R( g, h, a, b, c, d, e, f, K(42), M(42) );
+ R( f, g, h, a, b, c, d, e, K(43), M(43) );
+ R( e, f, g, h, a, b, c, d, K(44), M(44) );
+ R( d, e, f, g, h, a, b, c, K(45), M(45) );
+ R( c, d, e, f, g, h, a, b, K(46), M(46) );
+ R( b, c, d, e, f, g, h, a, K(47), M(47) );
+ R( a, b, c, d, e, f, g, h, K(48), M(48) );
+ R( h, a, b, c, d, e, f, g, K(49), M(49) );
+ R( g, h, a, b, c, d, e, f, K(50), M(50) );
+ R( f, g, h, a, b, c, d, e, K(51), M(51) );
+ R( e, f, g, h, a, b, c, d, K(52), M(52) );
+ R( d, e, f, g, h, a, b, c, K(53), M(53) );
+ R( c, d, e, f, g, h, a, b, K(54), M(54) );
+ R( b, c, d, e, f, g, h, a, K(55), M(55) );
+ R( a, b, c, d, e, f, g, h, K(56), M(56) );
+ R( h, a, b, c, d, e, f, g, K(57), M(57) );
+ R( g, h, a, b, c, d, e, f, K(58), M(58) );
+ R( f, g, h, a, b, c, d, e, K(59), M(59) );
+ R( e, f, g, h, a, b, c, d, K(60), M(60) );
+ R( d, e, f, g, h, a, b, c, K(61), M(61) );
+ R( c, d, e, f, g, h, a, b, K(62), M(62) );
+ R( b, c, d, e, f, g, h, a, K(63), M(63) );
+ R( a, b, c, d, e, f, g, h, K(64), M(64) );
+ R( h, a, b, c, d, e, f, g, K(65), M(65) );
+ R( g, h, a, b, c, d, e, f, K(66), M(66) );
+ R( f, g, h, a, b, c, d, e, K(67), M(67) );
+ R( e, f, g, h, a, b, c, d, K(68), M(68) );
+ R( d, e, f, g, h, a, b, c, K(69), M(69) );
+ R( c, d, e, f, g, h, a, b, K(70), M(70) );
+ R( b, c, d, e, f, g, h, a, K(71), M(71) );
+ R( a, b, c, d, e, f, g, h, K(72), M(72) );
+ R( h, a, b, c, d, e, f, g, K(73), M(73) );
+ R( g, h, a, b, c, d, e, f, K(74), M(74) );
+ R( f, g, h, a, b, c, d, e, K(75), M(75) );
+ R( e, f, g, h, a, b, c, d, K(76), M(76) );
+ R( d, e, f, g, h, a, b, c, K(77), M(77) );
+ R( c, d, e, f, g, h, a, b, K(78), M(78) );
+ R( b, c, d, e, f, g, h, a, K(79), M(79) );
+
+ a = ctx->state[0] = u64plus (ctx->state[0], a);
+ b = ctx->state[1] = u64plus (ctx->state[1], b);
+ c = ctx->state[2] = u64plus (ctx->state[2], c);
+ d = ctx->state[3] = u64plus (ctx->state[3], d);
+ e = ctx->state[4] = u64plus (ctx->state[4], e);
+ f = ctx->state[5] = u64plus (ctx->state[5], f);
+ g = ctx->state[6] = u64plus (ctx->state[6], g);
+ h = ctx->state[7] = u64plus (ctx->state[7], h);
+ }
+}
+
+#ifdef __cplusplus
+}
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/libs/signature/src/spas_client.c
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/libs/signature/src/spas_client.c b/rocketmq-cpp/libs/signature/src/spas_client.c
new file mode 100755
index 0000000..08b5a2d
--- /dev/null
+++ b/rocketmq-cpp/libs/signature/src/spas_client.c
@@ -0,0 +1,508 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+
+#include "spas_client.h"
+#include "sha1.h"
+#include "sha256.h"
+#include "hmac.h"
+#include "base64.h"
+
+#ifdef WIN32
+#include <io.h>
+#include <process.h>
+#else
+#include <unistd.h>
+#endif
+#ifdef SPAS_MT
+#include <pthread.h>
+#endif
+
+#ifdef __cplusplus
+namespace metaqSignature{
+#endif
+
+#define SPAS_VERSION "SPAS_V1_0"
+
+static SPAS_CREDENTIAL g_credential;
+static char g_path[SPAS_MAX_PATH];
+static int g_loaded = 0;
+static unsigned int refresh = 10;
+static time_t modified = 0;
+
+#ifdef SPAS_MT
+
+static pthread_mutex_t cred_mutex = PTHREAD_MUTEX_INITIALIZER;
+static pthread_once_t cred_once = PTHREAD_ONCE_INIT;
+static pthread_key_t cred_key;
+
+#endif
+
+extern void * _mem_alloc(unsigned int size);
+extern void * _mem_realloc(void *ptr, unsigned int old_size, unsigned int new_size);
+extern void _mem_free(void *ptr);
+extern void _trim(char *str);
+
+
+void * _mem_alloc(unsigned int size) {
+ void *p = malloc(size);
+ if (p != NULL) {
+ memset(p, 0, size);
+ }
+ return p;
+}
+
+void * _mem_realloc(void *ptr, unsigned int old_size, unsigned int new_size) {
+ void *p = realloc(ptr, new_size);
+ if (p != NULL && new_size > old_size) {
+ memset((unsigned int *)p + old_size, 0, new_size - old_size);
+ }
+ return p;
+}
+
+
+void _mem_free(void *ptr) {
+ free(ptr);
+}
+
+void _trim(char *str) {
+ int len = strlen(str);
+ int i;
+ int done = 0;
+ for (i = len - 1; i >= 0; i--) {
+ switch (str[i]) {
+ case ' ':
+ case '\t':
+ case '\r':
+ case '\n':
+ str[i] = '\0';
+ break;
+ default:
+ done = 1;
+ break;
+ }
+ if (done) {
+ break;
+ }
+ }
+}
+
+static int _load_credential(SPAS_CREDENTIAL *pcred, char *path) {
+ FILE *fp = NULL;
+ char buf[SPAS_MAX_KEY_LEN * 2];
+ if (pcred == NULL || path == NULL) {
+ return ERROR_INVALID_PARAM;
+ }
+ fp = fopen(path, "r");
+ if (fp == NULL) {
+ return ERROR_FILE_OPEN;
+ }
+ memset(pcred, 0, sizeof(SPAS_CREDENTIAL));
+ while (fgets(buf, sizeof(buf), fp)) {
+ _trim(buf);
+ int len = strlen(SPAS_ACCESS_KEY_TAG);
+ if (strncmp(buf, SPAS_ACCESS_KEY_TAG, len) == 0 && buf[len] == '=') {
+ strncpy(pcred->access_key, buf + len + 1, SPAS_MAX_KEY_LEN - 1);
+ }
+ else {
+ len = strlen(SPAS_SECRET_KEY_TAG);
+ if (strncmp(buf, SPAS_SECRET_KEY_TAG, len) == 0 && buf[len] == '=') {
+ strncpy(pcred->secret_key, buf + len + 1, SPAS_MAX_KEY_LEN - 1);
+ }
+ }
+ }
+ fclose(fp);
+ if (strlen(pcred->access_key) == 0 || strlen(pcred->secret_key) == 0) {
+ return ERROR_MISSING_KEY;
+ }
+ return NO_ERROR;
+}
+
+#ifndef WIN32
+static void _reload_credential(int sig) {
+ int ret;
+ SPAS_CREDENTIAL credential;
+ struct stat status;
+ struct sigaction act;
+
+ if (sig != SIGALRM) {
+ return;
+ }
+
+ memset(&act, 0, sizeof(act));
+ act.sa_handler = _reload_credential;
+ sigaction(SIGALRM, &act, NULL);
+ alarm(refresh);
+ if (g_path[0] != '\0') {
+ ret = stat(g_path, &status);
+ if (ret != 0) {
+ return;
+ }
+ if (status.st_mtime == modified) {
+ return;
+ }
+ ret = _load_credential(&credential, g_path);
+ if (ret != NO_ERROR) {
+ return;
+ }
+#ifdef SPAS_MT
+ pthread_mutex_lock(&cred_mutex);
+#endif
+ memcpy(&g_credential, &credential, sizeof(SPAS_CREDENTIAL));
+#ifdef SPAS_MT
+ pthread_mutex_unlock(&cred_mutex);
+#endif
+ modified = status.st_mtime;
+ }
+}
+
+static int _update_credential_by_alarm() {
+ struct sigaction act;
+
+ memset(&act, 0, sizeof(act));
+ act.sa_handler = _reload_credential;
+ sigaction(SIGALRM, &act, NULL);
+ alarm(refresh);
+ return NO_ERROR;
+}
+#endif
+
+#ifdef SPAS_MT
+
+static void * _update_credential_entry(void *arg) {
+ int ret;
+ SPAS_CREDENTIAL credential;
+ struct stat status;
+ struct timeval tv;
+ while (1) {
+ tv.tv_sec = refresh;
+ tv.tv_usec = 0;
+ select(0, NULL, NULL, NULL, &tv);
+ if (g_path[0] != '\0') {
+ ret = stat(g_path, &status);
+ if (ret != 0) {
+ continue;
+ }
+ if (status.st_mtime == modified) {
+ continue;
+ }
+ ret = _load_credential(&credential, g_path);
+ if (ret != NO_ERROR) {
+ continue;
+ }
+ pthread_mutex_lock(&cred_mutex);
+ memcpy(&g_credential, &credential, sizeof(SPAS_CREDENTIAL));
+ pthread_mutex_unlock(&cred_mutex);
+ modified = status.st_mtime;
+ }
+ }
+ return NULL;
+}
+
+static int _update_credential_by_thread() {
+ pthread_t tid;
+ int ret;
+
+ ret = pthread_create(&tid, NULL, _update_credential_entry, NULL);
+ if (ret != 0) {
+ return ERROR_UPDATE_CREDENTIAL;
+ }
+ pthread_detach(tid);
+ return NO_ERROR;
+}
+
+
+
+int spas_load_credential(char *path, CREDENTIAL_UPDATE_MODE mode) {
+ int ret = NO_ERROR;
+ SPAS_CREDENTIAL credential;
+
+ if (g_loaded) {
+ return NO_ERROR;
+ }
+ if (path == NULL) {
+ path = getenv(SPAS_CREDENTIAL_ENV);
+ if (path == NULL) {
+ return ERROR_NO_CREDENTIAL;
+ }
+ }
+ strncpy(g_path, path, SPAS_MAX_PATH - 1);
+ ret = _load_credential(&credential, path);
+ if (ret != NO_ERROR) {
+ return ret;
+ }
+#ifdef SPAS_MT
+ pthread_mutex_lock(&cred_mutex);
+#endif
+ if (!g_loaded) {
+ memcpy(&g_credential, &credential, sizeof(SPAS_CREDENTIAL));
+ g_loaded = 1;
+ }
+#ifdef SPAS_MT
+ pthread_mutex_unlock(&cred_mutex);
+#endif
+ switch (mode) {
+ case UPDATE_BY_ALARM:
+ ret = _update_credential_by_alarm();
+ break;
+#ifdef SPAS_MT
+ case UPDATE_BY_THREAD:
+ ret = _update_credential_by_thread();
+ break;
+#endif
+ case NO_UPDATE:
+ default:
+ ret = NO_ERROR;
+ break;
+ }
+ return ret;
+}
+
+#endif
+
+SPAS_CREDENTIAL * spas_get_credential(void) {
+ SPAS_CREDENTIAL *credential = (SPAS_CREDENTIAL *)_mem_alloc(sizeof(SPAS_CREDENTIAL));
+ if (credential != NULL) {
+#ifdef SPAS_MT
+ pthread_mutex_lock(&cred_mutex);
+#endif
+ memcpy(credential, &g_credential, sizeof(SPAS_CREDENTIAL));
+#ifdef SPAS_MT
+ pthread_mutex_unlock(&cred_mutex);
+#endif
+ }
+ return credential;
+}
+
+
+int spas_set_access_key(char *key) {
+ int len = 0;
+ if (key == NULL) {
+ return ERROR_INVALID_PARAM;
+ }
+ len = strlen(key);
+ if (len == 0 || len >= SPAS_MAX_KEY_LEN) {
+ return ERROR_KEY_LENGTH;
+ }
+#ifdef SPAS_MT
+ pthread_mutex_lock(&cred_mutex);
+#endif
+ memcpy(g_credential.access_key, key, len + 1);
+#ifdef SPAS_MT
+ pthread_mutex_unlock(&cred_mutex);
+#endif
+ return NO_ERROR;
+}
+
+int spas_set_secret_key(char *key) {
+ int len = 0;
+ if (key == NULL) {
+ return ERROR_INVALID_PARAM;
+ }
+ len = strlen(key);
+ if (len == 0 || len >= SPAS_MAX_KEY_LEN) {
+ return ERROR_KEY_LENGTH;
+ }
+#ifdef SPAS_MT
+ pthread_mutex_lock(&cred_mutex);
+#endif
+ memcpy(g_credential.secret_key, key, len + 1);
+#ifdef SPAS_MT
+ pthread_mutex_unlock(&cred_mutex);
+#endif
+ return NO_ERROR;
+}
+
+char * spas_get_access_key() {
+ return g_credential.access_key;
+}
+
+char * spas_get_secret_key() {
+ return g_credential.secret_key;
+}
+
+#ifdef SPAS_MT
+
+static void _free_thread_credential(void* credential)
+{
+ if (credential != NULL) {
+ _mem_free(credential);
+ }
+}
+
+static void _init_credential_key(void) {
+ pthread_key_create(&cred_key, _free_thread_credential);
+}
+
+static SPAS_CREDENTIAL * _get_thread_credential(void) {
+ int ret = 0;
+ SPAS_CREDENTIAL *credential = NULL;
+ ret = pthread_once(&cred_once, _init_credential_key);
+ if (ret != 0) {
+ return NULL;
+ }
+ credential = pthread_getspecific(cred_key);
+ if (credential == NULL) {
+ credential = _mem_alloc(sizeof(SPAS_CREDENTIAL));
+ if (credential == NULL) {
+ return NULL;
+ }
+ ret = pthread_setspecific(cred_key, credential);
+ if (ret != 0) {
+ _mem_free(credential);
+ return NULL;
+ }
+ }
+ return credential;
+}
+
+int spas_load_thread_credential(char *path) {
+ int ret = NO_ERROR;
+ SPAS_CREDENTIAL * credential = NULL;
+ credential = _get_thread_credential();
+ if (credential == NULL) {
+ return ERROR_MEM_ALLOC;
+ }
+ ret = _load_credential(credential, path);
+ if (ret != NO_ERROR) {
+ memset(credential, 0, sizeof(SPAS_CREDENTIAL));
+ return ret;
+ }
+ return NO_ERROR;
+}
+
+int spas_set_thread_access_key(char *key) {
+ int len = 0;
+ SPAS_CREDENTIAL * credential = NULL;
+ if (key == NULL) {
+ return ERROR_INVALID_PARAM;
+ }
+ len = strlen(key);
+ if (len == 0 || len >= SPAS_MAX_KEY_LEN) {
+ return ERROR_KEY_LENGTH;
+ }
+ credential = _get_thread_credential();
+ if (credential == NULL) {
+ return ERROR_MEM_ALLOC;
+ }
+ memcpy(credential->access_key, key, len + 1);
+ return NO_ERROR;
+}
+
+int spas_set_thread_secret_key(char *key) {
+ int len = 0;
+ SPAS_CREDENTIAL * credential = NULL;
+ if (key == NULL) {
+ return ERROR_INVALID_PARAM;
+ }
+ len = strlen(key);
+ if (len == 0 || len >= SPAS_MAX_KEY_LEN) {
+ return ERROR_KEY_LENGTH;
+ }
+ credential = _get_thread_credential();
+ if (credential == NULL) {
+ return ERROR_MEM_ALLOC;
+ }
+ memcpy(credential->secret_key, key, len + 1);
+ return NO_ERROR;
+
+}
+
+char * spas_get_thread_access_key(void) {
+ SPAS_CREDENTIAL * credential = _get_thread_credential();
+ if (credential == NULL) {
+ return NULL;
+ }
+ return credential->access_key;
+}
+
+char * spas_get_thread_secret_key(void) {
+ SPAS_CREDENTIAL * credential = _get_thread_credential();
+ if (credential == NULL) {
+ return NULL;
+ }
+ return credential->secret_key;
+}
+
+#endif
+
+
+char * spas_get_signature(const SPAS_PARAM_LIST *list, const char *key) {
+ return spas_get_signature2(list, key, SIGN_HMACSHA1);
+}
+
+char * spas_get_signature2(const SPAS_PARAM_LIST *list, const char *key, SPAS_SIGN_ALGORITHM algorithm) {
+ char *sign = NULL;
+ char *data = NULL;
+ if (list == NULL || key == NULL) {
+ return NULL;
+ }
+ data = param_list_to_str(list);
+ if (data == NULL) {
+ return NULL;
+ }
+ sign = spas_sign2(data, strlen(data),key, algorithm);
+ _mem_free(data);
+ return sign;
+}
+
+char * spas_sign(const char *data, size_t size, const char *key) {
+ return spas_sign2(data, size, key, SIGN_HMACSHA1);
+}
+
+char * spas_sign2(const char *data, size_t size, const char *key, SPAS_SIGN_ALGORITHM algorithm) {
+ int ret;
+ int dsize = 0;
+ char *sha_buf = NULL;
+ char *base64_ret = NULL;
+ if (data == NULL || key == NULL) {
+ return NULL;
+ }
+ if (algorithm == SIGN_HMACSHA1) {
+ dsize = SHA1_DIGEST_SIZE;
+ sha_buf = (char *)_mem_alloc(dsize + 1);
+ if (sha_buf == NULL) {
+ return NULL;
+ }
+ ret = hmac_sha1(key, strlen(key), data, size, sha_buf);
+ if (ret < 0) {
+ _mem_free(sha_buf);
+ return NULL;
+ }
+ }
+ else if (algorithm == SIGN_HMACSHA256) {
+ dsize = SHA256_DIGEST_SIZE;
+ sha_buf = (char *)_mem_alloc(dsize + 1);
+ if (sha_buf == NULL) {
+ return NULL;
+ }
+ ret = hmac_sha256(key, strlen(key), data, strlen(data), sha_buf);
+ if (ret < 0) {
+ _mem_free(sha_buf);
+ return NULL;
+ }
+ }
+ else {
+ return NULL;
+ }
+ ret = base64_encode_alloc(sha_buf, dsize, &base64_ret);
+ _mem_free(sha_buf);
+ return base64_ret;
+
+}
+
+void spas_mem_free(char *pSignature)
+{
+ _mem_free(pSignature);
+}
+
+char * spas_get_version(void) {
+ return SPAS_VERSION;
+}
+
+#ifdef __cplusplus
+}
+#endif
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/project/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/project/CMakeLists.txt b/rocketmq-cpp/project/CMakeLists.txt
new file mode 100755
index 0000000..02723af
--- /dev/null
+++ b/rocketmq-cpp/project/CMakeLists.txt
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# source files
+project(rocketmq4cpp)
+
+file(GLOB_RECURSE SRC_FILES ${CMAKE_SOURCE_DIR}/src/*)
+list(REMOVE_ITEM SRC_FILES ${CMAKE_SOURCE_DIR}/src/dllmain.cpp)
+
+# subdirs
+SET(SUB_DIRS)
+file(GLOB children ${CMAKE_SOURCE_DIR}/src/*)
+FOREACH(child ${children})
+ IF(IS_DIRECTORY ${child})
+ LIST(APPEND SUB_DIRS ${child})
+ ENDIF()
+ENDFOREACH()
+LIST(APPEND SUB_DIRS ${CMAKE_SOURCE_DIR}/src)
+
+include_directories(${CMAKE_SOURCE_DIR}/include)
+include_directories(${SUB_DIRS})
+
+# libs_directories
+file(GLOB LIB_DIRS ${CMAKE_SOURCE_DIR}/libs/*)
+foreach(dir ${LIB_DIRS})
+ if(IS_DIRECTORY ${dir})
+ set(CMAKE_PREFIX_PATH ${CMAKE_PREFIX_PATH};${dir})
+ include_directories(${dir}/include)
+ endif()
+endforeach()
+
+# static
+add_library(rocketmq_static STATIC ${SRC_FILES})
+set_target_properties(rocketmq_static PROPERTIES OUTPUT_NAME "rocketmq")
+add_dependencies(rocketmq_static Signature)
+target_link_libraries(rocketmq_static ${deplibs})
+target_link_libraries(rocketmq_static Signature)
+
+# shared
+set(CMAKE_SHARED_LINKER_FLAGS "-DBOOST_ALL_DYN_LINK -shared")
+add_library(rocketmq_shared SHARED ${SRC_FILES})
+set_target_properties(rocketmq_shared PROPERTIES OUTPUT_NAME "rocketmq")
+add_dependencies(rocketmq_shared Signature)
+target_link_libraries(rocketmq_shared ${deplibs})
+target_link_libraries(rocketmq_shared Signature)
+
+# install
+install (TARGETS rocketmq_static DESTINATION bin)
+install (DIRECTORY ${CMAKE_SOURCE_DIR}/include/ DESTINATION include)
+install (DIRECTORY ${CMAKE_SOURCE_DIR}/doc/ DESTINATION doc)
+
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/project/tool.mak
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/project/tool.mak b/rocketmq-cpp/project/tool.mak
new file mode 100644
index 0000000..e8f57fc
--- /dev/null
+++ b/rocketmq-cpp/project/tool.mak
@@ -0,0 +1,21 @@
+define BUILD_LIBRARY
+$(if $(wildcard $@),@$(RM) $@)
+$(if $(wildcard ar.mac),@$(RM) ar.mac)
+$(if $(filter %.a, $^),
+@echo CREATE $@ > ar.mac
+@echo SAVE >> ar.mac
+@echo END >> ar.mac
+@$(AR) -M < ar.mac
+)
+$(if $(filter %.o,$^),@$(AR) -q $@ $(filter %.o, $^))
+$(if $(filter %.a, $^),
+@echo OPEN $@ > ar.mac
+$(foreach LIB, $(filter %.a, $^),
+@echo ADDLIB $(LIB) >> ar.mac
+)
+@echo SAVE >> ar.mac
+@echo END >> ar.mac
+@$(AR) -M < ar.mac
+@$(RM) ar.mac
+)
+endef
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/MQClientAPIImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/MQClientAPIImpl.cpp b/rocketmq-cpp/src/MQClientAPIImpl.cpp
new file mode 100755
index 0000000..2d9c39c
--- /dev/null
+++ b/rocketmq-cpp/src/MQClientAPIImpl.cpp
@@ -0,0 +1,922 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MQClientAPIImpl.h"
+#include <assert.h>
+#include <fstream>
+#include "CommunicationMode.h"
+#include "Logging.h"
+#include "MQDecoder.h"
+#include "PullResultExt.h"
+
+namespace rocketmq {
+//<!************************************************************************
+MQClientAPIImpl::MQClientAPIImpl(
+ const string& mqClientId, ClientRemotingProcessor* clientRemotingProcessor, int pullThreadNum,
+ uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout,
+ string unitName)
+ : m_firstFetchNameSrv(true), m_mqClientId(mqClientId) {
+ m_pRemotingClient.reset(new TcpRemotingClient(
+ pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout));
+ m_pRemotingClient->registerProcessor(CHECK_TRANSACTION_STATE,
+ clientRemotingProcessor);
+ m_pRemotingClient->registerProcessor(RESET_CONSUMER_CLIENT_OFFSET,
+ clientRemotingProcessor);
+ m_pRemotingClient->registerProcessor(GET_CONSUMER_STATUS_FROM_CLIENT,
+ clientRemotingProcessor);
+ m_pRemotingClient->registerProcessor(GET_CONSUMER_RUNNING_INFO,
+ clientRemotingProcessor);
+ m_pRemotingClient->registerProcessor(NOTIFY_CONSUMER_IDS_CHANGED,
+ clientRemotingProcessor);
+ m_pRemotingClient->registerProcessor(CONSUME_MESSAGE_DIRECTLY,
+ clientRemotingProcessor);
+
+ m_topAddressing.reset(new TopAddressing(unitName));
+}
+
+MQClientAPIImpl::~MQClientAPIImpl() {
+ m_pRemotingClient = NULL;
+ m_topAddressing = NULL;
+}
+
+void MQClientAPIImpl::stopAllTcpTransportThread() {
+ m_pRemotingClient->stopAllTcpTransportThread();
+}
+
+bool MQClientAPIImpl::writeDataToFile(string filename, string data,
+ bool isSync) {
+ if (data.size() == 0) return false;
+
+ int fd = open(filename.c_str(), O_RDWR | O_CREAT, 0755);
+ if (fd < 0) {
+ LOG_ERROR("open file failed, file:%s, msg:%s", filename.c_str(),
+ strerror(errno));
+ return false;
+ }
+
+ int byte_write = 0;
+ int byte_left = data.size();
+ const char* pData = data.c_str();
+ while (byte_left > 0) {
+ byte_write = write(fd, pData, byte_left);
+ if (byte_write == -1) {
+ LOG_ERROR("write data fail, data len:%zu, file:%s, msg:%s", data.size(),
+ filename.c_str(), strerror(errno));
+ close(fd);
+ return false;
+ }
+ byte_left -= byte_write;
+ pData += byte_write;
+ }
+ pData = NULL;
+
+ if (isSync) {
+ LOG_INFO("fsync with filename:%s", filename.c_str());
+ fsync(fd);
+ }
+ close(fd);
+
+ return true;
+}
+
+string MQClientAPIImpl::fetchNameServerAddr(const string& NSDomain) {
+ try {
+ string homeDir(UtilAll::getHomeDirectory());
+ string storePath = homeDir + "/logs/metaq-client4cpp/snapshot";
+
+ if (access(storePath.c_str(), F_OK) != 0) {
+ if (mkdir(storePath.c_str(), S_IRWXU | S_IRWXG | S_IRWXO) != 0) {
+ LOG_ERROR("create data dir:%s error", storePath.c_str());
+ }
+ }
+ string file(storePath);
+ string fileBak(storePath);
+ vector<string> ret_;
+ int retSize = UtilAll::Split(ret_, m_mqClientId, "@");
+ if(retSize==2){
+ file.append("/nameserver_addr-").append(ret_[retSize-1]);
+ }else{
+ LOG_ERROR("split mqClientId:%s fail", m_mqClientId.c_str());
+ file.append("/nameserver_addr-DEFAULT");
+ }
+ fileBak.append("/nameserver_addr.bak");
+ const string addrs = m_topAddressing->fetchNSAddr(NSDomain);
+ if (addrs.empty()) {
+ if (m_nameSrvAddr.empty()) {
+ LOG_INFO("Load the name server snapshot local file:%s", file.c_str());
+ if (access(file.c_str(), F_OK) == 0) {
+ ifstream snapshot_file(file, ios::binary);
+ istreambuf_iterator<char> beg(snapshot_file), end;
+ string filecontent(beg, end);
+ updateNameServerAddr(filecontent);
+ m_nameSrvAddr = filecontent;
+ } else {
+ LOG_WARN("The name server snapshot local file not exists");
+ }
+ }
+ } else {
+ if (m_firstFetchNameSrv == true) {
+ // it is the first time, so need to create the name server snapshot
+ // local file
+ m_firstFetchNameSrv = false;
+ }
+ if (addrs.compare(m_nameSrvAddr) != 0) {
+ LOG_INFO("name server address changed, old: %s, new: %s",
+ m_nameSrvAddr.c_str(), addrs.c_str());
+ updateNameServerAddr(addrs);
+ m_nameSrvAddr = addrs;
+ } else {
+ if (!m_firstFetchNameSrv) return m_nameSrvAddr;
+ }
+ // update the snapshot local file if nameSrv changes or
+ // m_firstFetchNameSrv==true
+ if (writeDataToFile(fileBak, addrs, true)) {
+ if (rename(fileBak.c_str(), file.c_str()) == -1)
+ LOG_ERROR("could not rename bak file:%s", strerror(errno));
+ }
+ }
+
+ if (access(file.c_str(), F_OK) != 0) {
+ // the name server snapshot local file maybe deleted by force, create it
+ if (writeDataToFile(fileBak, m_nameSrvAddr, true)) {
+ if (rename(fileBak.c_str(), file.c_str()) == -1)
+ LOG_ERROR("could not rename bak file:%s", strerror(errno));
+ }
+ }
+ } catch (...) {
+ }
+ return m_nameSrvAddr;
+}
+
+void MQClientAPIImpl::updateNameServerAddr(const string& addrs) {
+ if (m_pRemotingClient != NULL)
+ m_pRemotingClient->updateNameServerAddressList(addrs);
+}
+
+void MQClientAPIImpl::callSignatureBeforeRequest(
+ const string& addr, RemotingCommand& request,
+ const SessionCredentials& session_credentials) {
+ ClientRPCHook rpcHook(session_credentials);
+ rpcHook.doBeforeRequest(addr, request);
+}
+
+// Note: all request rules: throw exception if got broker error response,
+// exclude getTopicRouteInfoFromNameServer and unregisterClient
+void MQClientAPIImpl::createTopic(
+ const string& addr, const string& defaultTopic, TopicConfig topicConfig,
+ const SessionCredentials& sessionCredentials) {
+ string topicWithProjectGroup = topicConfig.getTopicName();
+ CreateTopicRequestHeader* requestHeader = new CreateTopicRequestHeader();
+ requestHeader->topic = (topicWithProjectGroup);
+ requestHeader->defaultTopic = (defaultTopic);
+ requestHeader->readQueueNums = (topicConfig.getReadQueueNums());
+ requestHeader->writeQueueNums = (topicConfig.getWriteQueueNums());
+ requestHeader->perm = (topicConfig.getPerm());
+ requestHeader->topicFilterType = (topicConfig.getTopicFilterType());
+
+ RemotingCommand request(UPDATE_AND_CREATE_TOPIC, requestHeader);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ unique_ptr<RemotingCommand> response(
+ m_pRemotingClient->invokeSync(addr, request));
+
+ if (response) {
+ switch (response->getCode()) {
+ case SUCCESS_VALUE:
+ return;
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+ response->getCode());
+ }
+ THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+SendResult MQClientAPIImpl::sendMessage(
+ const string& addr, const string& brokerName, const MQMessage& msg,
+ SendMessageRequestHeader* pRequestHeader, int timeoutMillis,
+ int communicationMode, SendCallback* pSendCallback,
+ const SessionCredentials& sessionCredentials) {
+ RemotingCommand request(SEND_MESSAGE, pRequestHeader);
+ string body = msg.getBody();
+ request.SetBody(body.c_str(), body.length());
+ request.setMsgBody(body);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ switch (communicationMode) {
+ case ComMode_ONEWAY:
+ m_pRemotingClient->invokeOneway(addr, request);
+ break;
+ case ComMode_ASYNC:
+ sendMessageAsync(addr, brokerName, msg, request, pSendCallback,
+ timeoutMillis);
+ break;
+ case ComMode_SYNC:
+ return sendMessageSync(addr, brokerName, msg, request, timeoutMillis);
+ default:
+ break;
+ }
+ return SendResult();
+}
+
+void MQClientAPIImpl::sendHearbeat(
+ const string& addr, HeartbeatData* pHeartbeatData,
+ const SessionCredentials& sessionCredentials) {
+ RemotingCommand request(HEART_BEAT, NULL);
+
+ string body;
+ pHeartbeatData->Encode(body);
+ request.SetBody(body.data(), body.length());
+ request.setMsgBody(body);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ if (m_pRemotingClient->invokeHeartBeat(addr, request)) {
+ LOG_INFO("sendheartbeat to broker:%s success", addr.c_str());
+ }
+}
+
+void MQClientAPIImpl::unregisterClient(
+ const string& addr, const string& clientID, const string& producerGroup,
+ const string& consumerGroup, const SessionCredentials& sessionCredentials) {
+ LOG_INFO("unregisterClient to broker:%s", addr.c_str());
+ RemotingCommand request(UNREGISTER_CLIENT,
+ new UnregisterClientRequestHeader(
+ clientID, producerGroup, consumerGroup));
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ unique_ptr<RemotingCommand> response(
+ m_pRemotingClient->invokeSync(addr, request));
+
+ if (response) {
+ switch (response->getCode()) {
+ case SUCCESS_VALUE:
+ LOG_INFO("unregisterClient to:%s success", addr.c_str());
+ return;
+ default:
+ break;
+ }
+ LOG_WARN("unregisterClient fail:%s,%d", response->getRemark().c_str(),
+ response->getCode());
+ }
+}
+
+// return NULL if got no response or error response
+TopicRouteData* MQClientAPIImpl::getTopicRouteInfoFromNameServer(
+ const string& topic, int timeoutMillis,
+ const SessionCredentials& sessionCredentials) {
+ RemotingCommand request(GET_ROUTEINTO_BY_TOPIC,
+ new GetRouteInfoRequestHeader(topic));
+ callSignatureBeforeRequest("", request, sessionCredentials);
+ request.Encode();
+
+ unique_ptr<RemotingCommand> pResponse(
+ m_pRemotingClient->invokeSync("", request, timeoutMillis));
+
+ if (pResponse != NULL) {
+ if (((*(pResponse->GetBody())).getSize() == 0) ||
+ ((*(pResponse->GetBody())).getData() != NULL)) {
+ switch (pResponse->getCode()) {
+ case SUCCESS_VALUE: {
+ const MemoryBlock* pbody = pResponse->GetBody();
+ if (pbody->getSize()) {
+ TopicRouteData* topicRoute = TopicRouteData::Decode(pbody);
+ return topicRoute;
+ }
+ }
+ case TOPIC_NOT_EXIST: {
+ return NULL;
+ }
+ default:
+ break;
+ }
+ LOG_WARN("%s,%d", pResponse->getRemark().c_str(), pResponse->getCode());
+ return NULL;
+ }
+ }
+ return NULL;
+}
+
+TopicList* MQClientAPIImpl::getTopicListFromNameServer(
+ const SessionCredentials& sessionCredentials) {
+ RemotingCommand request(GET_ALL_TOPIC_LIST_FROM_NAMESERVER, NULL);
+ callSignatureBeforeRequest("", request, sessionCredentials);
+ request.Encode();
+
+ unique_ptr<RemotingCommand> pResponse(
+ m_pRemotingClient->invokeSync("", request));
+ if (pResponse != NULL) {
+ if (((*(pResponse->GetBody())).getSize() == 0) ||
+ ((*(pResponse->GetBody())).getData() != NULL)) {
+ switch (pResponse->getCode()) {
+ case SUCCESS_VALUE: {
+ const MemoryBlock* pbody = pResponse->GetBody();
+ if (pbody->getSize()) {
+ TopicList* topicList = TopicList::Decode(pbody);
+ return topicList;
+ }
+ }
+ default:
+ break;
+ }
+
+ THROW_MQEXCEPTION(MQClientException, pResponse->getRemark(),
+ pResponse->getCode());
+ }
+ }
+ return NULL;
+}
+
+int MQClientAPIImpl::wipeWritePermOfBroker(const string& namesrvAddr,
+ const string& brokerName,
+ int timeoutMillis) {
+ return 0;
+}
+
+void MQClientAPIImpl::deleteTopicInBroker(const string& addr,
+ const string& topic,
+ int timeoutMillis) {}
+
+void MQClientAPIImpl::deleteTopicInNameServer(const string& addr,
+ const string& topic,
+ int timeoutMillis) {}
+
+void MQClientAPIImpl::deleteSubscriptionGroup(const string& addr,
+ const string& groupName,
+ int timeoutMillis) {}
+
+string MQClientAPIImpl::getKVConfigByValue(const string& projectNamespace,
+ const string& projectGroup,
+ int timeoutMillis) {
+ return "";
+}
+
+KVTable MQClientAPIImpl::getKVListByNamespace(const string& projectNamespace,
+ int timeoutMillis) {
+ return KVTable();
+}
+
+void MQClientAPIImpl::deleteKVConfigByValue(const string& projectNamespace,
+ const string& projectGroup,
+ int timeoutMillis) {}
+
+SendResult MQClientAPIImpl::sendMessageSync(const string& addr,
+ const string& brokerName,
+ const MQMessage& msg,
+ RemotingCommand& request,
+ int timeoutMillis) {
+ //<!block util response;
+ unique_ptr<RemotingCommand> pResponse(
+ m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+ if (pResponse != NULL) {
+ try {
+ LOG_DEBUG("sendMessageSync success:%s to addr:%s,brokername:%s",
+ msg.toString().c_str(), addr.c_str(), brokerName.c_str());
+ SendResult result = processSendResponse(brokerName, msg, pResponse.get());
+ return result;
+ } catch (...) {
+ LOG_ERROR("send error");
+ }
+ }
+ THROW_MQEXCEPTION(MQClientException, "response is null", -1);
+}
+
+void MQClientAPIImpl::sendMessageAsync(const string& addr,
+ const string& brokerName,
+ const MQMessage& msg,
+ RemotingCommand& request,
+ SendCallback* pSendCallback,
+ int64 timeoutMilliseconds) {
+ //<!delete in future;
+ AsyncCallbackWrap* cbw =
+ new SendCallbackWrap(brokerName, msg, pSendCallback, this);
+ if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMilliseconds) ==
+ false) {
+ LOG_ERROR("sendMessageAsync failed to addr:%s", addr.c_str());
+ if (cbw) {
+ cbw->onException();
+ deleteAndZero(cbw);
+ } else {
+ THROW_MQEXCEPTION(MQClientException, "sendMessageAsync failed", -1);
+ }
+ }
+}
+
+PullResult* MQClientAPIImpl::pullMessage(
+ const string& addr, PullMessageRequestHeader* pRequestHeader,
+ int timeoutMillis, int communicationMode, PullCallback* pullCallback,
+ void* pArg, const SessionCredentials& sessionCredentials) {
+ RemotingCommand request(PULL_MESSAGE, pRequestHeader);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ switch (communicationMode) {
+ case ComMode_ONEWAY:
+ break;
+ case ComMode_ASYNC:
+ pullMessageAsync(addr, request, timeoutMillis, pullCallback, pArg);
+ break;
+ case ComMode_SYNC:
+ return pullMessageSync(addr, request, timeoutMillis);
+ default:
+ break;
+ }
+
+ return NULL;
+}
+
+void MQClientAPIImpl::pullMessageAsync(const string& addr,
+ RemotingCommand& request,
+ int timeoutMillis,
+ PullCallback* pullCallback, void* pArg) {
+ //<!delete in future;
+ AsyncCallbackWrap* cbw = new PullCallbackWarp(pullCallback, this, pArg);
+ if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) ==
+ false) {
+ LOG_ERROR("pullMessageAsync failed of addr:%s", addr.c_str());
+ deleteAndZero(cbw);
+ THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1);
+ }
+}
+
+PullResult* MQClientAPIImpl::pullMessageSync(const string& addr,
+ RemotingCommand& request,
+ int timeoutMillis) {
+ unique_ptr<RemotingCommand> pResponse(
+ m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+ if (pResponse != NULL) {
+ if (((*(pResponse->GetBody())).getSize() == 0) ||
+ ((*(pResponse->GetBody())).getData() != NULL)) {
+ try {
+ PullResult* pullResult =
+ processPullResponse(pResponse.get()); // pullMessage will handle
+ // exception from
+ // processPullResponse
+ return pullResult;
+ } catch (MQException& e) {
+ LOG_ERROR(e.what());
+ return NULL;
+ }
+ }
+ }
+ return NULL;
+}
+
+SendResult MQClientAPIImpl::processSendResponse(const string& brokerName,
+ const MQMessage& msg,
+ RemotingCommand* pResponse) {
+ SendStatus sendStatus = SEND_OK;
+ int res = 0;
+ switch (pResponse->getCode()) {
+ case FLUSH_DISK_TIMEOUT:
+ sendStatus = SEND_FLUSH_DISK_TIMEOUT;
+ break;
+ case FLUSH_SLAVE_TIMEOUT:
+ sendStatus = SEND_FLUSH_SLAVE_TIMEOUT;
+ break;
+ case SLAVE_NOT_AVAILABLE:
+ sendStatus = SEND_SLAVE_NOT_AVAILABLE;
+ break;
+ case SUCCESS_VALUE:
+ sendStatus = SEND_OK;
+ break;
+ default:
+ res = -1;
+ break;
+ }
+ if (res == 0) {
+ SendMessageResponseHeader* responseHeader =
+ (SendMessageResponseHeader*)pResponse->getCommandHeader();
+ MQMessageQueue messageQueue(msg.getTopic(), brokerName,
+ responseHeader->queueId);
+ return SendResult(sendStatus, responseHeader->msgId, messageQueue,
+ responseHeader->queueOffset);
+ }
+ LOG_ERROR("processSendResponse error remark:%s, error code:%d",
+ (pResponse->getRemark()).c_str(), pResponse->getCode());
+ THROW_MQEXCEPTION(MQClientException, pResponse->getRemark(),
+ pResponse->getCode());
+}
+
+PullResult* MQClientAPIImpl::processPullResponse(RemotingCommand* pResponse) {
+ PullStatus pullStatus = NO_NEW_MSG;
+ switch (pResponse->getCode()) {
+ case SUCCESS_VALUE:
+ pullStatus = FOUND;
+ break;
+ case PULL_NOT_FOUND:
+ pullStatus = NO_NEW_MSG;
+ break;
+ case PULL_RETRY_IMMEDIATELY:
+ pullStatus = NO_MATCHED_MSG;
+ break;
+ case PULL_OFFSET_MOVED:
+ pullStatus = OFFSET_ILLEGAL;
+ break;
+ default:
+ THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(),
+ pResponse->getCode());
+ break;
+ }
+
+ PullMessageResponseHeader* responseHeader =
+ static_cast<PullMessageResponseHeader*>(pResponse->getCommandHeader());
+
+ if (!responseHeader) {
+ LOG_ERROR("processPullResponse:responseHeader is NULL");
+ THROW_MQEXCEPTION(MQClientException,
+ "processPullResponse:responseHeader is NULL", -1);
+ }
+ //<!get body,delete outsite;
+ MemoryBlock bodyFromResponse =
+ *(pResponse->GetBody()); // response data judgement had been done outside
+ // of processPullResponse
+ if (bodyFromResponse.getSize() == 0) {
+ if (pullStatus != FOUND) {
+ return new PullResultExt(pullStatus, responseHeader->nextBeginOffset,
+ responseHeader->minOffset,
+ responseHeader->maxOffset,
+ (int)responseHeader->suggestWhichBrokerId);
+ } else {
+ THROW_MQEXCEPTION(MQClientException,
+ "memoryBody size is 0, but pullStatus equals found",
+ -1);
+ }
+ } else {
+ return new PullResultExt(
+ pullStatus, responseHeader->nextBeginOffset, responseHeader->minOffset,
+ responseHeader->maxOffset, (int)responseHeader->suggestWhichBrokerId,
+ bodyFromResponse);
+ }
+}
+
+//<!***************************************************************************
+int64 MQClientAPIImpl::getMinOffset(
+ const string& addr, const string& topic, int queueId, int timeoutMillis,
+ const SessionCredentials& sessionCredentials) {
+ GetMinOffsetRequestHeader* pRequestHeader = new GetMinOffsetRequestHeader();
+ pRequestHeader->topic = topic;
+ pRequestHeader->queueId = queueId;
+
+ RemotingCommand request(GET_MIN_OFFSET, pRequestHeader);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ unique_ptr<RemotingCommand> response(
+ m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+ if (response) {
+ switch (response->getCode()) {
+ case SUCCESS_VALUE: {
+ GetMinOffsetResponseHeader* responseHeader =
+ (GetMinOffsetResponseHeader*)response->getCommandHeader();
+
+ int64 offset = responseHeader->offset;
+ return offset;
+ }
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+ response->getCode());
+ }
+ THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+int64 MQClientAPIImpl::getMaxOffset(
+ const string& addr, const string& topic, int queueId, int timeoutMillis,
+ const SessionCredentials& sessionCredentials) {
+ GetMaxOffsetRequestHeader* pRequestHeader = new GetMaxOffsetRequestHeader();
+ pRequestHeader->topic = topic;
+ pRequestHeader->queueId = queueId;
+
+ RemotingCommand request(GET_MAX_OFFSET, pRequestHeader);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ unique_ptr<RemotingCommand> response(
+ m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+ if (response) {
+ switch (response->getCode()) {
+ case SUCCESS_VALUE: {
+ GetMaxOffsetResponseHeader* responseHeader =
+ (GetMaxOffsetResponseHeader*)response->getCommandHeader();
+
+ int64 offset = responseHeader->offset;
+ return offset;
+ }
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+ response->getCode());
+ }
+ THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+int64 MQClientAPIImpl::searchOffset(
+ const string& addr, const string& topic, int queueId, uint64_t timestamp,
+ int timeoutMillis, const SessionCredentials& sessionCredentials) {
+ SearchOffsetRequestHeader* pRequestHeader = new SearchOffsetRequestHeader();
+ pRequestHeader->topic = topic;
+ pRequestHeader->queueId = queueId;
+ pRequestHeader->timestamp = timestamp;
+
+ RemotingCommand request(SEARCH_OFFSET_BY_TIMESTAMP, pRequestHeader);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ unique_ptr<RemotingCommand> response(
+ m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+ if (response) {
+ switch (response->getCode()) {
+ case SUCCESS_VALUE: {
+ SearchOffsetResponseHeader* responseHeader =
+ (SearchOffsetResponseHeader*)response->getCommandHeader();
+
+ int64 offset = responseHeader->offset;
+ return offset;
+ }
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+ response->getCode());
+ }
+ THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+MQMessageExt* MQClientAPIImpl::viewMessage(
+ const string& addr, int64 phyoffset, int timeoutMillis,
+ const SessionCredentials& sessionCredentials) {
+ ViewMessageRequestHeader* pRequestHeader = new ViewMessageRequestHeader();
+ pRequestHeader->offset = phyoffset;
+
+ RemotingCommand request(VIEW_MESSAGE_BY_ID, pRequestHeader);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ unique_ptr<RemotingCommand> response(
+ m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+ if (response) {
+ switch (response->getCode()) {
+ case SUCCESS_VALUE: {
+ }
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+ response->getCode());
+ }
+ THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+int64 MQClientAPIImpl::getEarliestMsgStoretime(
+ const string& addr, const string& topic, int queueId, int timeoutMillis,
+ const SessionCredentials& sessionCredentials) {
+ GetEarliestMsgStoretimeRequestHeader* pRequestHeader =
+ new GetEarliestMsgStoretimeRequestHeader();
+ pRequestHeader->topic = topic;
+ pRequestHeader->queueId = queueId;
+
+ RemotingCommand request(GET_EARLIEST_MSG_STORETIME, pRequestHeader);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ unique_ptr<RemotingCommand> response(
+ m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+ if (response) {
+ switch (response->getCode()) {
+ case SUCCESS_VALUE: {
+ GetEarliestMsgStoretimeResponseHeader* responseHeader =
+ (GetEarliestMsgStoretimeResponseHeader*)
+ response->getCommandHeader();
+
+ int64 timestamp = responseHeader->timestamp;
+ return timestamp;
+ }
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+ response->getCode());
+ }
+ THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+void MQClientAPIImpl::getConsumerIdListByGroup(
+ const string& addr, const string& consumerGroup, vector<string>& cids,
+ int timeoutMillis, const SessionCredentials& sessionCredentials) {
+ GetConsumerListByGroupRequestHeader* pRequestHeader =
+ new GetConsumerListByGroupRequestHeader();
+ pRequestHeader->consumerGroup = consumerGroup;
+
+ RemotingCommand request(GET_CONSUMER_LIST_BY_GROUP, pRequestHeader);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ unique_ptr<RemotingCommand> pResponse(
+ m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+ if (pResponse != NULL) {
+ if ((pResponse->GetBody()->getSize() == 0) ||
+ (pResponse->GetBody()->getData() != NULL)) {
+ switch (pResponse->getCode()) {
+ case SUCCESS_VALUE: {
+ const MemoryBlock* pbody = pResponse->GetBody();
+ if (pbody->getSize()) {
+ GetConsumerListByGroupResponseBody::Decode(pbody, cids);
+ return;
+ }
+ }
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(),
+ pResponse->getCode());
+ }
+ }
+ THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+int64 MQClientAPIImpl::queryConsumerOffset(
+ const string& addr, QueryConsumerOffsetRequestHeader* pRequestHeader,
+ int timeoutMillis, const SessionCredentials& sessionCredentials) {
+ RemotingCommand request(QUERY_CONSUMER_OFFSET, pRequestHeader);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ unique_ptr<RemotingCommand> response(
+ m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+ if (response) {
+ switch (response->getCode()) {
+ case SUCCESS_VALUE: {
+ QueryConsumerOffsetResponseHeader* responseHeader =
+ (QueryConsumerOffsetResponseHeader*)response->getCommandHeader();
+ int64 consumerOffset = responseHeader->offset;
+ return consumerOffset;
+ }
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+ response->getCode());
+ }
+ THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+ return -1;
+}
+
+void MQClientAPIImpl::updateConsumerOffset(
+ const string& addr, UpdateConsumerOffsetRequestHeader* pRequestHeader,
+ int timeoutMillis, const SessionCredentials& sessionCredentials) {
+ RemotingCommand request(UPDATE_CONSUMER_OFFSET, pRequestHeader);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ unique_ptr<RemotingCommand> response(
+ m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+ if (response) {
+ switch (response->getCode()) {
+ case SUCCESS_VALUE: {
+ return;
+ }
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+ response->getCode());
+ }
+ THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+void MQClientAPIImpl::updateConsumerOffsetOneway(
+ const string& addr, UpdateConsumerOffsetRequestHeader* pRequestHeader,
+ int timeoutMillis, const SessionCredentials& sessionCredentials) {
+ RemotingCommand request(UPDATE_CONSUMER_OFFSET, pRequestHeader);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ m_pRemotingClient->invokeOneway(addr, request);
+}
+
+void MQClientAPIImpl::consumerSendMessageBack(
+ MQMessageExt& msg, const string& consumerGroup, int delayLevel,
+ int timeoutMillis, const SessionCredentials& sessionCredentials) {
+ ConsumerSendMsgBackRequestHeader* pRequestHeader =
+ new ConsumerSendMsgBackRequestHeader();
+ pRequestHeader->group = consumerGroup;
+ pRequestHeader->offset = msg.getCommitLogOffset();
+ pRequestHeader->delayLevel = delayLevel;
+
+ string addr = socketAddress2IPPort(msg.getStoreHost());
+ RemotingCommand request(CONSUMER_SEND_MSG_BACK, pRequestHeader);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ unique_ptr<RemotingCommand> response(
+ m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+ if (response) {
+ switch (response->getCode()) {
+ case SUCCESS_VALUE: {
+ return;
+ }
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQBrokerException, response->getRemark(),
+ response->getCode());
+ }
+ THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+void MQClientAPIImpl::lockBatchMQ(
+ const string& addr, LockBatchRequestBody* requestBody,
+ vector<MQMessageQueue>& mqs, int timeoutMillis,
+ const SessionCredentials& sessionCredentials) {
+ RemotingCommand request(LOCK_BATCH_MQ, NULL);
+ string body;
+ requestBody->Encode(body);
+ request.SetBody(body.data(), body.length());
+ request.setMsgBody(body);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ unique_ptr<RemotingCommand> pResponse(
+ m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+ if (pResponse != NULL) {
+ if (((*(pResponse->GetBody())).getSize() == 0) ||
+ ((*(pResponse->GetBody())).getData() != NULL)) {
+ switch (pResponse->getCode()) {
+ case SUCCESS_VALUE: {
+ const MemoryBlock* pbody = pResponse->GetBody();
+ if (pbody->getSize()) {
+ LockBatchResponseBody::Decode(pbody, mqs);
+ }
+ return;
+ } break;
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(),
+ pResponse->getCode());
+ }
+ }
+ THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+void MQClientAPIImpl::unlockBatchMQ(
+ const string& addr, UnlockBatchRequestBody* requestBody, int timeoutMillis,
+ const SessionCredentials& sessionCredentials) {
+ RemotingCommand request(UNLOCK_BATCH_MQ, NULL);
+ string body;
+ requestBody->Encode(body);
+ request.SetBody(body.data(), body.length());
+ request.setMsgBody(body);
+ callSignatureBeforeRequest(addr, request, sessionCredentials);
+ request.Encode();
+
+ unique_ptr<RemotingCommand> pResponse(
+ m_pRemotingClient->invokeSync(addr, request, timeoutMillis));
+
+ if (pResponse != NULL) {
+ switch (pResponse->getCode()) {
+ case SUCCESS_VALUE: {
+ return;
+ } break;
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(),
+ pResponse->getCode());
+ }
+ THROW_MQEXCEPTION(MQBrokerException, "response is null", -1);
+}
+
+//<!************************************************************************
+} //<!end namespace;
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/70ce5c77/rocketmq-cpp/src/MQClientAPIImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-cpp/src/MQClientAPIImpl.h b/rocketmq-cpp/src/MQClientAPIImpl.h
new file mode 100644
index 0000000..31e61a0
--- /dev/null
+++ b/rocketmq-cpp/src/MQClientAPIImpl.h
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __MQCLIENTAPIIMPL_H__
+#define __MQCLIENTAPIIMPL_H__
+#include "AsyncCallback.h"
+#include "ClientRPCHook.h"
+#include "ClientRemotingProcessor.h"
+#include "CommandHeader.h"
+#include "HeartbeatData.h"
+#include "KVTable.h"
+#include "LockBatchBody.h"
+#include "MQClientException.h"
+#include "MQMessageExt.h"
+#include "MQProtos.h"
+#include "SendResult.h"
+#include "SocketUtil.h"
+#include "TcpRemotingClient.h"
+#include "TopAddressing.h"
+#include "TopicConfig.h"
+#include "TopicList.h"
+#include "TopicRouteData.h"
+#include "UtilAll.h"
+#include "VirtualEnvUtil.h"
+
+namespace rocketmq {
+//<!wrap all API to net ;
+//<!************************************************************************
+class MQClientAPIImpl {
+ public:
+ MQClientAPIImpl(const string& mqClientId, ClientRemotingProcessor* clientRemotingProcessor,
+ int pullThreadNum, uint64_t tcpConnectTimeout,
+ uint64_t tcpTransportTryLockTimeout, string unitName);
+ virtual ~MQClientAPIImpl();
+ void stopAllTcpTransportThread();
+ bool writeDataToFile(string filename, string data, bool isSync);
+ string fetchNameServerAddr(const string& NSDomain);
+ void updateNameServerAddr(const string& addrs);
+
+ void callSignatureBeforeRequest(
+ const string& addr, RemotingCommand& request,
+ const SessionCredentials& session_credentials);
+ void createTopic(const string& addr, const string& defaultTopic,
+ TopicConfig topicConfig,
+ const SessionCredentials& sessionCredentials);
+
+ SendResult sendMessage(const string& addr, const string& brokerName,
+ const MQMessage& msg,
+ SendMessageRequestHeader* pRequestHeader,
+ int timeoutMillis, int communicationMode,
+ SendCallback* pSendCallback,
+ const SessionCredentials& sessionCredentials);
+
+ PullResult* pullMessage(const string& addr,
+ PullMessageRequestHeader* pRequestHeader,
+ int timeoutMillis, int communicationMode,
+ PullCallback* pullCallback, void* pArg,
+ const SessionCredentials& sessionCredentials);
+
+ void sendHearbeat(const string& addr, HeartbeatData* pHeartbeatData,
+ const SessionCredentials& sessionCredentials);
+
+ void unregisterClient(const string& addr, const string& clientID,
+ const string& producerGroup,
+ const string& consumerGroup,
+ const SessionCredentials& sessionCredentials);
+
+ TopicRouteData* getTopicRouteInfoFromNameServer(
+ const string& topic, int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ TopicList* getTopicListFromNameServer(
+ const SessionCredentials& sessionCredentials);
+
+ int wipeWritePermOfBroker(const string& namesrvAddr, const string& brokerName,
+ int timeoutMillis);
+
+ void deleteTopicInBroker(const string& addr, const string& topic,
+ int timeoutMillis);
+
+ void deleteTopicInNameServer(const string& addr, const string& topic,
+ int timeoutMillis);
+
+ void deleteSubscriptionGroup(const string& addr, const string& groupName,
+ int timeoutMillis);
+
+ string getKVConfigByValue(const string& projectNamespace,
+ const string& projectGroup, int timeoutMillis);
+
+ KVTable getKVListByNamespace(const string& projectNamespace,
+ int timeoutMillis);
+
+ void deleteKVConfigByValue(const string& projectNamespace,
+ const string& projectGroup, int timeoutMillis);
+
+ SendResult processSendResponse(const string& brokerName, const MQMessage& msg,
+ RemotingCommand* pResponse);
+
+ PullResult* processPullResponse(RemotingCommand* pResponse);
+
+ int64 getMinOffset(const string& addr, const string& topic, int queueId,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ int64 getMaxOffset(const string& addr, const string& topic, int queueId,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ int64 searchOffset(const string& addr, const string& topic, int queueId,
+ uint64_t timestamp, int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ MQMessageExt* viewMessage(const string& addr, int64 phyoffset,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ int64 getEarliestMsgStoretime(const string& addr, const string& topic,
+ int queueId, int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ void getConsumerIdListByGroup(const string& addr, const string& consumerGroup,
+ vector<string>& cids, int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ int64 queryConsumerOffset(const string& addr,
+ QueryConsumerOffsetRequestHeader* pRequestHeader,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ void updateConsumerOffset(const string& addr,
+ UpdateConsumerOffsetRequestHeader* pRequestHeader,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ void updateConsumerOffsetOneway(
+ const string& addr, UpdateConsumerOffsetRequestHeader* pRequestHeader,
+ int timeoutMillis, const SessionCredentials& sessionCredentials);
+
+ void consumerSendMessageBack(MQMessageExt& msg, const string& consumerGroup,
+ int delayLevel, int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ void lockBatchMQ(const string& addr, LockBatchRequestBody* requestBody,
+ vector<MQMessageQueue>& mqs, int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ void unlockBatchMQ(const string& addr, UnlockBatchRequestBody* requestBody,
+ int timeoutMillis,
+ const SessionCredentials& sessionCredentials);
+
+ private:
+ SendResult sendMessageSync(const string& addr, const string& brokerName,
+ const MQMessage& msg, RemotingCommand& request,
+ int timeoutMillis);
+
+ void sendMessageAsync(const string& addr, const string& brokerName,
+ const MQMessage& msg, RemotingCommand& request,
+ SendCallback* pSendCallback, int64 timeoutMilliseconds);
+
+ PullResult* pullMessageSync(const string& addr, RemotingCommand& request,
+ int timeoutMillis);
+
+ void pullMessageAsync(const string& addr, RemotingCommand& request,
+ int timeoutMillis, PullCallback* pullCallback,
+ void* pArg);
+
+ private:
+ unique_ptr<TcpRemotingClient> m_pRemotingClient;
+ unique_ptr<TopAddressing> m_topAddressing;
+ string m_nameSrvAddr;
+ bool m_firstFetchNameSrv;
+ string m_mqClientId;
+};
+} //<!end namespace;
+//<!***************************************************************************
+#endif