You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/08/12 18:14:15 UTC

[1/9] git commit: Improve JMX support for streaming

Updated Branches:
  refs/heads/cassandra-2.0 b87270b39 -> 839cc3307
  refs/heads/cassandra-2.0.0 09a4dc055 -> a34790085
  refs/heads/trunk d9a64a4a3 -> cb349d9e7


Improve JMX support for streaming

patch by yukim; reviewed by thobbs for CASSANDRA-5859


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a0d6ed12
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a0d6ed12
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a0d6ed12

Branch: refs/heads/cassandra-2.0
Commit: a0d6ed1290540673b0336418cbca0dd2f07e64a8
Parents: 09a4dc0
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Aug 8 14:30:16 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Aug 12 11:09:56 2013 -0500

----------------------------------------------------------------------
 .../cassandra/streaming/StreamManager.java      |  40 ++++-
 .../cassandra/streaming/StreamManagerMBean.java |   6 +-
 .../management/ProgressInfoCompositeData.java   | 103 ++++++++++++
 .../SessionCompleteEventCompositeData.java      |  71 ++++++++
 .../management/SessionInfoCompositeData.java    | 163 +++++++++++++++++++
 .../management/StreamEventJMXNotifier.java      |  78 +++++++++
 .../management/StreamStateCompositeData.java    | 102 ++++++++++++
 .../management/StreamSummaryCompositeData.java  |  82 ++++++++++
 .../org/apache/cassandra/tools/NodeProbe.java   |  12 +-
 9 files changed, 650 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index 5fc1c75..ccd0053 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -21,6 +21,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import javax.management.ListenerNotFoundException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+import javax.management.openmbean.CompositeData;
+
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -29,6 +35,8 @@ import com.google.common.util.concurrent.RateLimiter;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.streaming.management.StreamEventJMXNotifier;
+import org.apache.cassandra.streaming.management.StreamStateCompositeData;
 
 /**
  * StreamManager manages currently running {@link StreamResultFuture}s and provides status of all operation invoked.
@@ -60,6 +68,8 @@ public class StreamManager implements StreamManagerMBean
         return limiter;
     }
 
+    private final StreamEventJMXNotifier notifier = new StreamEventJMXNotifier();
+
     /*
      * Currently running streams. Removed after completion/failure.
      * We manage them in two different maps to distinguish plan from initiated ones to
@@ -68,19 +78,20 @@ public class StreamManager implements StreamManagerMBean
     private final Map<UUID, StreamResultFuture> initiatedStreams = new NonBlockingHashMap<>();
     private final Map<UUID, StreamResultFuture> receivingStreams = new NonBlockingHashMap<>();
 
-    public Set<StreamState> getCurrentStreams()
+    public Set<CompositeData> getCurrentStreams()
     {
-        return Sets.newHashSet(Iterables.transform(Iterables.concat(initiatedStreams.values(), receivingStreams.values()), new Function<StreamResultFuture, StreamState>()
+        return Sets.newHashSet(Iterables.transform(Iterables.concat(initiatedStreams.values(), receivingStreams.values()), new Function<StreamResultFuture, CompositeData>()
         {
-            public StreamState apply(StreamResultFuture input)
+            public CompositeData apply(StreamResultFuture input)
             {
-                return input.getCurrentState();
+                return StreamStateCompositeData.toCompositeData(input.getCurrentState());
             }
         }));
     }
 
     public void register(final StreamResultFuture result)
     {
+        result.addEventListener(notifier);
         // Make sure we remove the stream on completion (whether successful or not)
         result.addListener(new Runnable()
         {
@@ -95,6 +106,7 @@ public class StreamManager implements StreamManagerMBean
 
     public void registerReceiving(final StreamResultFuture result)
     {
+        result.addEventListener(notifier);
         // Make sure we remove the stream on completion (whether successful or not)
         result.addListener(new Runnable()
         {
@@ -111,4 +123,24 @@ public class StreamManager implements StreamManagerMBean
     {
         return receivingStreams.get(planId);
     }
+
+    public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback)
+    {
+        notifier.addNotificationListener(listener, filter, handback);
+    }
+
+    public void removeNotificationListener(NotificationListener listener) throws ListenerNotFoundException
+    {
+        notifier.removeNotificationListener(listener);
+    }
+
+    public void removeNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) throws ListenerNotFoundException
+    {
+        notifier.removeNotificationListener(listener, filter, handback);
+    }
+
+    public MBeanNotificationInfo[] getNotificationInfo()
+    {
+        return notifier.getNotificationInfo();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
index f338fb5..f329596 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
@@ -18,13 +18,15 @@
 package org.apache.cassandra.streaming;
 
 import java.util.Set;
+import javax.management.NotificationEmitter;
+import javax.management.openmbean.CompositeData;
 
-public interface StreamManagerMBean
+public interface StreamManagerMBean extends NotificationEmitter
 {
     public static final String OBJECT_NAME = "org.apache.cassandra.net:type=StreamManager";
 
     /**
      * Returns the current state of all ongoing streams.
      */
-    Set<StreamState> getCurrentStreams();
+    Set<CompositeData> getCurrentStreams();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
new file mode 100644
index 0000000..b361b1b
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.streaming.ProgressInfo;
+
+public class ProgressInfoCompositeData
+{
+    private static final String[] ITEM_NAMES = new String[]{"planId",
+                                                            "peer",
+                                                            "fileName",
+                                                            "direction",
+                                                            "currentBytes",
+                                                            "totalBytes"};
+    private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID",
+                                                            "Session peer",
+                                                            "Name of the file",
+                                                            "Direction('IN' or 'OUT')",
+                                                            "Current bytes transferred",
+                                                            "Total bytes to transfer"};
+    private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+                                                                   SimpleType.STRING,
+                                                                   SimpleType.STRING,
+                                                                   SimpleType.STRING,
+                                                                   SimpleType.LONG,
+                                                                   SimpleType.LONG};
+
+    public static final CompositeType COMPOSITE_TYPE;
+    static  {
+        try
+        {
+            COMPOSITE_TYPE = new CompositeType(ProgressInfo.class.getName(),
+                                               "ProgressInfo",
+                                               ITEM_NAMES,
+                                               ITEM_DESCS,
+                                               ITEM_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CompositeData toCompositeData(UUID planId, ProgressInfo progressInfo)
+    {
+        Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ITEM_NAMES[0], planId.toString());
+        valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress());
+        valueMap.put(ITEM_NAMES[2], progressInfo.fileName);
+        valueMap.put(ITEM_NAMES[3], progressInfo.direction.name());
+        valueMap.put(ITEM_NAMES[4], progressInfo.currentBytes);
+        valueMap.put(ITEM_NAMES[5], progressInfo.totalBytes);
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static ProgressInfo fromCompositeData(CompositeData cd)
+    {
+        Object[] values = cd.getAll(ITEM_NAMES);
+        try
+        {
+            return new ProgressInfo(InetAddress.getByName((String) values[1]),
+                                    (String) values[2],
+                                    ProgressInfo.Direction.valueOf((String)values[3]),
+                                    (long) values[4],
+                                    (long) values[5]);
+        }
+        catch (UnknownHostException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
new file mode 100644
index 0000000..3351e6e
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.streaming.StreamEvent;
+
+public class SessionCompleteEventCompositeData
+{
+    private static final String[] ITEM_NAMES = new String[]{"planId",
+                                                            "peer",
+                                                            "success"};
+    private static final String[] ITEM_DESCS = new String[]{"Plan ID",
+                                                            "Session peer",
+                                                            "Indicates whether session was successful"};
+    private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+                                                                   SimpleType.STRING,
+                                                                   SimpleType.BOOLEAN};
+
+    public static final CompositeType COMPOSITE_TYPE;
+    static  {
+        try
+        {
+            COMPOSITE_TYPE = new CompositeType(StreamEvent.SessionCompleteEvent.class.getName(),
+                                               "SessionCompleteEvent",
+                                               ITEM_NAMES,
+                                               ITEM_DESCS,
+                                               ITEM_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CompositeData toCompositeData(StreamEvent.SessionCompleteEvent event)
+    {
+        Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ITEM_NAMES[0], event.planId.toString());
+        valueMap.put(ITEM_NAMES[1], event.peer.getHostAddress());
+        valueMap.put(ITEM_NAMES[2], event.success);
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
new file mode 100644
index 0000000..658facf
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+
+public class SessionInfoCompositeData
+{
+    private static final String[] ITEM_NAMES = new String[]{"planId",
+                                                            "peer",
+                                                            "receivingSummaries",
+                                                            "sendingSummaries",
+                                                            "state",
+                                                            "receivingFiles",
+                                                            "sendingFiles"};
+    private static final String[] ITEM_DESCS = new String[]{"Plan ID",
+                                                            "Session peer",
+                                                            "Summaries of receiving data",
+                                                            "Summaries of sending data",
+                                                            "Current session state",
+                                                            "Receiving files",
+                                                            "Sending files"};
+    private static final OpenType<?>[] ITEM_TYPES;
+
+    public static final CompositeType COMPOSITE_TYPE;
+    static  {
+        try
+        {
+            ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+                                        SimpleType.STRING,
+                                        ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
+                                        ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
+                                        SimpleType.STRING,
+                                        ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE),
+                                        ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE)};
+            COMPOSITE_TYPE = new CompositeType(SessionInfo.class.getName(),
+                                               "SessionInfo",
+                                               ITEM_NAMES,
+                                               ITEM_DESCS,
+                                               ITEM_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CompositeData toCompositeData(final UUID planId, SessionInfo sessionInfo)
+    {
+        Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ITEM_NAMES[0], planId.toString());
+        valueMap.put(ITEM_NAMES[1], sessionInfo.peer.getHostAddress());
+        Function<StreamSummary, CompositeData> fromStreamSummary = new Function<StreamSummary, CompositeData>()
+        {
+            public CompositeData apply(StreamSummary input)
+            {
+                return StreamSummaryCompositeData.toCompositeData(input);
+            }
+        };
+        valueMap.put(ITEM_NAMES[2], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary));
+        valueMap.put(ITEM_NAMES[3], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary));
+        valueMap.put(ITEM_NAMES[4], sessionInfo.state.name());
+        Function<ProgressInfo, CompositeData> fromProgressInfo = new Function<ProgressInfo, CompositeData>()
+        {
+            public CompositeData apply(ProgressInfo input)
+            {
+                return ProgressInfoCompositeData.toCompositeData(planId, input);
+            }
+        };
+        valueMap.put(ITEM_NAMES[5], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
+        valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static SessionInfo fromCompositeData(CompositeData cd)
+    {
+        assert cd.getCompositeType().equals(COMPOSITE_TYPE);
+
+        Object[] values = cd.getAll(ITEM_NAMES);
+        InetAddress peer;
+        try
+        {
+            peer = InetAddress.getByName((String) values[1]);
+        }
+        catch (UnknownHostException e)
+        {
+            throw Throwables.propagate(e);
+        }
+        Function<CompositeData, StreamSummary> toStreamSummary = new Function<CompositeData, StreamSummary>()
+        {
+            public StreamSummary apply(CompositeData input)
+            {
+                return StreamSummaryCompositeData.fromCompositeData(input);
+            }
+        };
+        SessionInfo info = new SessionInfo(peer,
+                                           fromArrayOfCompositeData((CompositeData[]) values[2], toStreamSummary),
+                                           fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
+                                           StreamSession.State.valueOf((String) values[4]));
+        Function<CompositeData, ProgressInfo> toProgressInfo = new Function<CompositeData, ProgressInfo>()
+        {
+            public ProgressInfo apply(CompositeData input)
+            {
+                return ProgressInfoCompositeData.fromCompositeData(input);
+            }
+        };
+        for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[5], toProgressInfo))
+        {
+            info.updateProgress(progress);
+        }
+        for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo))
+        {
+            info.updateProgress(progress);
+        }
+        return info;
+    }
+
+    private static <T> Collection<T> fromArrayOfCompositeData(CompositeData[] cds, Function<CompositeData, T> func)
+    {
+        return Lists.newArrayList(Iterables.transform(Arrays.asList(cds), func));
+    }
+
+    private static <T> CompositeData[] toArrayOfCompositeData(Collection<T> toConvert, Function<T, CompositeData> func)
+    {
+        CompositeData[] composites = new CompositeData[toConvert.size()];
+        return Lists.newArrayList(Iterables.transform(toConvert, func)).toArray(composites);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
new file mode 100644
index 0000000..f8c54ec
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.Notification;
+import javax.management.NotificationBroadcasterSupport;
+
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamManagerMBean;
+import org.apache.cassandra.streaming.StreamState;
+
+/**
+ */
+public class StreamEventJMXNotifier extends NotificationBroadcasterSupport implements StreamEventHandler
+{
+    private final AtomicLong seq = new AtomicLong();
+
+    public void handleStreamEvent(StreamEvent event)
+    {
+        Notification notif = null;
+        switch (event.eventType) {
+            case STREAM_PREPARED:
+                notif = new Notification(StreamEvent.SessionPreparedEvent.class.getCanonicalName(),
+                                                StreamManagerMBean.OBJECT_NAME,
+                                                seq.getAndIncrement());
+                notif.setUserData(SessionInfoCompositeData.toCompositeData(event.planId, ((StreamEvent.SessionPreparedEvent) event).session));
+                break;
+            case STREAM_COMPLETE:
+                notif = new Notification(StreamEvent.SessionCompleteEvent.class.getCanonicalName(),
+                                                StreamManagerMBean.OBJECT_NAME,
+                                                seq.getAndIncrement());
+                notif.setUserData(SessionCompleteEventCompositeData.toCompositeData((StreamEvent.SessionCompleteEvent) event));
+                break;
+            case FILE_PROGRESS:
+                notif = new Notification(StreamEvent.ProgressEvent.class.getCanonicalName(),
+                                         StreamManagerMBean.OBJECT_NAME,
+                                         seq.getAndIncrement());
+                notif.setUserData(ProgressInfoCompositeData.toCompositeData(event.planId, ((StreamEvent.ProgressEvent) event).progress));
+                break;
+        }
+        sendNotification(notif);
+    }
+
+    public void onSuccess(StreamState result)
+    {
+        Notification notif = new Notification(StreamEvent.class.getCanonicalName() + ".success",
+                                              StreamManagerMBean.OBJECT_NAME,
+                                              seq.getAndIncrement());
+        notif.setUserData(StreamStateCompositeData.toCompositeData(result));
+        sendNotification(notif);
+    }
+
+    public void onFailure(Throwable t)
+    {
+        Notification notif = new Notification(StreamEvent.class.getCanonicalName() + ".failure",
+                                              StreamManagerMBean.OBJECT_NAME,
+                                              seq.getAndIncrement());
+        notif.setUserData(t.fillInStackTrace().toString());
+        sendNotification(notif);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java b/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
new file mode 100644
index 0000000..820a71a
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.*;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamState;
+
+/**
+ */
+public class StreamStateCompositeData
+{
+    private static final String[] ITEM_NAMES = new String[]{"planId", "description", "sessions"};
+    private static final String[] ITEM_DESCS = new String[]{"Plan ID of this stream",
+                                                            "Stream plan description",
+                                                            "Active stream sessions"};
+    private static final OpenType<?>[] ITEM_TYPES;
+
+    public static final CompositeType COMPOSITE_TYPE;
+    static  {
+        try
+        {
+            ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+                                         SimpleType.STRING,
+                                         ArrayType.getArrayType(SessionInfoCompositeData.COMPOSITE_TYPE)};
+            COMPOSITE_TYPE = new CompositeType(StreamState.class.getName(),
+                                            "StreamState",
+                                            ITEM_NAMES,
+                                            ITEM_DESCS,
+                                            ITEM_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CompositeData toCompositeData(final StreamState streamState)
+    {
+        Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ITEM_NAMES[0], streamState.planId.toString());
+        valueMap.put(ITEM_NAMES[1], streamState.description);
+
+        CompositeData[] sessions = new CompositeData[streamState.sessions.size()];
+        Lists.newArrayList(Iterables.transform(streamState.sessions, new Function<SessionInfo, CompositeData>()
+        {
+            public CompositeData apply(SessionInfo input)
+            {
+                return SessionInfoCompositeData.toCompositeData(streamState.planId, input);
+            }
+        })).toArray(sessions);
+        valueMap.put(ITEM_NAMES[2], sessions);
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static StreamState fromCompositeData(CompositeData cd)
+    {
+        assert cd.getCompositeType().equals(COMPOSITE_TYPE);
+        Object[] values = cd.getAll(ITEM_NAMES);
+        UUID planId = UUID.fromString((String) values[0]);
+        String description = (String) values[1];
+        Set<SessionInfo> sessions = Sets.newHashSet(Iterables.transform(Arrays.asList((CompositeData[]) values[2]),
+                                                                        new Function<CompositeData, SessionInfo>()
+                                                                        {
+                                                                            public SessionInfo apply(CompositeData input)
+                                                                            {
+                                                                                return SessionInfoCompositeData.fromCompositeData(input);
+                                                                            }
+                                                                        }));
+        return new StreamState(planId, description, sessions);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java b/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
new file mode 100644
index 0000000..e93069c
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.streaming.StreamSummary;
+
+/**
+ */
+public class StreamSummaryCompositeData
+{
+    private static final String[] ITEM_NAMES = new String[]{"cfId",
+                                                            "files",
+                                                            "totalSize"};
+    private static final String[] ITEM_DESCS = new String[]{"ColumnFamilu ID",
+                                                            "Number of files",
+                                                            "Total bytes of the files"};
+    private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+                                                                   SimpleType.INTEGER,
+                                                                   SimpleType.LONG};
+
+    public static final CompositeType COMPOSITE_TYPE;
+    static  {
+        try
+        {
+            COMPOSITE_TYPE = new CompositeType(StreamSummary.class.getName(),
+                                               "StreamSummary",
+                                               ITEM_NAMES,
+                                               ITEM_DESCS,
+                                               ITEM_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CompositeData toCompositeData(StreamSummary streamSummary)
+    {
+        Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ITEM_NAMES[0], streamSummary.cfId.toString());
+        valueMap.put(ITEM_NAMES[1], streamSummary.files);
+        valueMap.put(ITEM_NAMES[2], streamSummary.totalSize);
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static StreamSummary fromCompositeData(CompositeData cd)
+    {
+        Object[] values = cd.getAll(ITEM_NAMES);
+        return new StreamSummary(UUID.fromString((String) values[0]),
+                                 (int) values[1],
+                                 (long) values[2]);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 13624a2..0da2944 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -32,11 +32,14 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Condition;
 import javax.management.*;
+import javax.management.openmbean.CompositeData;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
 
+import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
@@ -52,6 +55,7 @@ import org.apache.cassandra.net.MessagingServiceMBean;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.streaming.StreamManagerMBean;
+import org.apache.cassandra.streaming.management.StreamStateCompositeData;
 import org.apache.cassandra.utils.SimpleCondition;
 
 /**
@@ -547,7 +551,13 @@ public class NodeProbe
 
     public Set<StreamState> getStreamStatus()
     {
-        return streamProxy.getCurrentStreams();
+        return Sets.newHashSet(Iterables.transform(streamProxy.getCurrentStreams(), new Function<CompositeData, StreamState>()
+        {
+            public StreamState apply(CompositeData input)
+            {
+                return StreamStateCompositeData.fromCompositeData(input);
+            }
+        }));
     }
 
     public String getOperationMode()


[5/9] git commit: suppress sending too many progress notifications

Posted by yu...@apache.org.
suppress sending too many progress notifications


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3479008
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3479008
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3479008

Branch: refs/heads/cassandra-2.0.0
Commit: a34790085ae7f79615f7225911ba9cf8ceb8f3e2
Parents: a0d6ed1
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Aug 12 11:07:53 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Aug 12 11:10:37 2013 -0500

----------------------------------------------------------------------
 .../management/StreamEventJMXNotifier.java      | 26 ++++++++++++++------
 1 file changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3479008/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
index f8c54ec..01ca9a1 100644
--- a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
+++ b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
@@ -21,17 +21,19 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.management.Notification;
 import javax.management.NotificationBroadcasterSupport;
 
-import org.apache.cassandra.streaming.StreamEvent;
-import org.apache.cassandra.streaming.StreamEventHandler;
-import org.apache.cassandra.streaming.StreamManagerMBean;
-import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.streaming.*;
 
 /**
  */
 public class StreamEventJMXNotifier extends NotificationBroadcasterSupport implements StreamEventHandler
 {
+    // interval in millisec to use for progress notification
+    private static final long PROGRESS_NOTIFICATION_INTERVAL = 1000;
+
     private final AtomicLong seq = new AtomicLong();
 
+    private long progressLastSent;
+
     public void handleStreamEvent(StreamEvent event)
     {
         Notification notif = null;
@@ -49,10 +51,18 @@ public class StreamEventJMXNotifier extends NotificationBroadcasterSupport imple
                 notif.setUserData(SessionCompleteEventCompositeData.toCompositeData((StreamEvent.SessionCompleteEvent) event));
                 break;
             case FILE_PROGRESS:
-                notif = new Notification(StreamEvent.ProgressEvent.class.getCanonicalName(),
-                                         StreamManagerMBean.OBJECT_NAME,
-                                         seq.getAndIncrement());
-                notif.setUserData(ProgressInfoCompositeData.toCompositeData(event.planId, ((StreamEvent.ProgressEvent) event).progress));
+                ProgressInfo progress = ((StreamEvent.ProgressEvent) event).progress;
+                long current = System.currentTimeMillis();
+                if (current - progressLastSent >= PROGRESS_NOTIFICATION_INTERVAL || progress.isCompleted())
+                {
+                    notif = new Notification(StreamEvent.ProgressEvent.class.getCanonicalName(),
+                                             StreamManagerMBean.OBJECT_NAME,
+                                             seq.getAndIncrement());
+                    notif.setUserData(ProgressInfoCompositeData.toCompositeData(event.planId, progress));
+                    progressLastSent = System.currentTimeMillis();
+                } else {
+                    return;
+                }
                 break;
         }
         sendNotification(notif);


[3/9] git commit: Improve JMX support for streaming

Posted by yu...@apache.org.
Improve JMX support for streaming

patch by yukim; reviewed by thobbs for CASSANDRA-5859


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a0d6ed12
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a0d6ed12
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a0d6ed12

Branch: refs/heads/trunk
Commit: a0d6ed1290540673b0336418cbca0dd2f07e64a8
Parents: 09a4dc0
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Aug 8 14:30:16 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Aug 12 11:09:56 2013 -0500

----------------------------------------------------------------------
 .../cassandra/streaming/StreamManager.java      |  40 ++++-
 .../cassandra/streaming/StreamManagerMBean.java |   6 +-
 .../management/ProgressInfoCompositeData.java   | 103 ++++++++++++
 .../SessionCompleteEventCompositeData.java      |  71 ++++++++
 .../management/SessionInfoCompositeData.java    | 163 +++++++++++++++++++
 .../management/StreamEventJMXNotifier.java      |  78 +++++++++
 .../management/StreamStateCompositeData.java    | 102 ++++++++++++
 .../management/StreamSummaryCompositeData.java  |  82 ++++++++++
 .../org/apache/cassandra/tools/NodeProbe.java   |  12 +-
 9 files changed, 650 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index 5fc1c75..ccd0053 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -21,6 +21,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import javax.management.ListenerNotFoundException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+import javax.management.openmbean.CompositeData;
+
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -29,6 +35,8 @@ import com.google.common.util.concurrent.RateLimiter;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.streaming.management.StreamEventJMXNotifier;
+import org.apache.cassandra.streaming.management.StreamStateCompositeData;
 
 /**
  * StreamManager manages currently running {@link StreamResultFuture}s and provides status of all operation invoked.
@@ -60,6 +68,8 @@ public class StreamManager implements StreamManagerMBean
         return limiter;
     }
 
+    private final StreamEventJMXNotifier notifier = new StreamEventJMXNotifier();
+
     /*
      * Currently running streams. Removed after completion/failure.
      * We manage them in two different maps to distinguish plan from initiated ones to
@@ -68,19 +78,20 @@ public class StreamManager implements StreamManagerMBean
     private final Map<UUID, StreamResultFuture> initiatedStreams = new NonBlockingHashMap<>();
     private final Map<UUID, StreamResultFuture> receivingStreams = new NonBlockingHashMap<>();
 
-    public Set<StreamState> getCurrentStreams()
+    public Set<CompositeData> getCurrentStreams()
     {
-        return Sets.newHashSet(Iterables.transform(Iterables.concat(initiatedStreams.values(), receivingStreams.values()), new Function<StreamResultFuture, StreamState>()
+        return Sets.newHashSet(Iterables.transform(Iterables.concat(initiatedStreams.values(), receivingStreams.values()), new Function<StreamResultFuture, CompositeData>()
         {
-            public StreamState apply(StreamResultFuture input)
+            public CompositeData apply(StreamResultFuture input)
             {
-                return input.getCurrentState();
+                return StreamStateCompositeData.toCompositeData(input.getCurrentState());
             }
         }));
     }
 
     public void register(final StreamResultFuture result)
     {
+        result.addEventListener(notifier);
         // Make sure we remove the stream on completion (whether successful or not)
         result.addListener(new Runnable()
         {
@@ -95,6 +106,7 @@ public class StreamManager implements StreamManagerMBean
 
     public void registerReceiving(final StreamResultFuture result)
     {
+        result.addEventListener(notifier);
         // Make sure we remove the stream on completion (whether successful or not)
         result.addListener(new Runnable()
         {
@@ -111,4 +123,24 @@ public class StreamManager implements StreamManagerMBean
     {
         return receivingStreams.get(planId);
     }
+
+    public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback)
+    {
+        notifier.addNotificationListener(listener, filter, handback);
+    }
+
+    public void removeNotificationListener(NotificationListener listener) throws ListenerNotFoundException
+    {
+        notifier.removeNotificationListener(listener);
+    }
+
+    public void removeNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) throws ListenerNotFoundException
+    {
+        notifier.removeNotificationListener(listener, filter, handback);
+    }
+
+    public MBeanNotificationInfo[] getNotificationInfo()
+    {
+        return notifier.getNotificationInfo();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
index f338fb5..f329596 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
@@ -18,13 +18,15 @@
 package org.apache.cassandra.streaming;
 
 import java.util.Set;
+import javax.management.NotificationEmitter;
+import javax.management.openmbean.CompositeData;
 
-public interface StreamManagerMBean
+public interface StreamManagerMBean extends NotificationEmitter
 {
     public static final String OBJECT_NAME = "org.apache.cassandra.net:type=StreamManager";
 
     /**
      * Returns the current state of all ongoing streams.
      */
-    Set<StreamState> getCurrentStreams();
+    Set<CompositeData> getCurrentStreams();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
new file mode 100644
index 0000000..b361b1b
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.streaming.ProgressInfo;
+
+public class ProgressInfoCompositeData
+{
+    private static final String[] ITEM_NAMES = new String[]{"planId",
+                                                            "peer",
+                                                            "fileName",
+                                                            "direction",
+                                                            "currentBytes",
+                                                            "totalBytes"};
+    private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID",
+                                                            "Session peer",
+                                                            "Name of the file",
+                                                            "Direction('IN' or 'OUT')",
+                                                            "Current bytes transferred",
+                                                            "Total bytes to transfer"};
+    private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+                                                                   SimpleType.STRING,
+                                                                   SimpleType.STRING,
+                                                                   SimpleType.STRING,
+                                                                   SimpleType.LONG,
+                                                                   SimpleType.LONG};
+
+    public static final CompositeType COMPOSITE_TYPE;
+    static  {
+        try
+        {
+            COMPOSITE_TYPE = new CompositeType(ProgressInfo.class.getName(),
+                                               "ProgressInfo",
+                                               ITEM_NAMES,
+                                               ITEM_DESCS,
+                                               ITEM_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CompositeData toCompositeData(UUID planId, ProgressInfo progressInfo)
+    {
+        Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ITEM_NAMES[0], planId.toString());
+        valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress());
+        valueMap.put(ITEM_NAMES[2], progressInfo.fileName);
+        valueMap.put(ITEM_NAMES[3], progressInfo.direction.name());
+        valueMap.put(ITEM_NAMES[4], progressInfo.currentBytes);
+        valueMap.put(ITEM_NAMES[5], progressInfo.totalBytes);
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static ProgressInfo fromCompositeData(CompositeData cd)
+    {
+        Object[] values = cd.getAll(ITEM_NAMES);
+        try
+        {
+            return new ProgressInfo(InetAddress.getByName((String) values[1]),
+                                    (String) values[2],
+                                    ProgressInfo.Direction.valueOf((String)values[3]),
+                                    (long) values[4],
+                                    (long) values[5]);
+        }
+        catch (UnknownHostException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
new file mode 100644
index 0000000..3351e6e
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.streaming.StreamEvent;
+
+public class SessionCompleteEventCompositeData
+{
+    private static final String[] ITEM_NAMES = new String[]{"planId",
+                                                            "peer",
+                                                            "success"};
+    private static final String[] ITEM_DESCS = new String[]{"Plan ID",
+                                                            "Session peer",
+                                                            "Indicates whether session was successful"};
+    private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+                                                                   SimpleType.STRING,
+                                                                   SimpleType.BOOLEAN};
+
+    public static final CompositeType COMPOSITE_TYPE;
+    static  {
+        try
+        {
+            COMPOSITE_TYPE = new CompositeType(StreamEvent.SessionCompleteEvent.class.getName(),
+                                               "SessionCompleteEvent",
+                                               ITEM_NAMES,
+                                               ITEM_DESCS,
+                                               ITEM_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CompositeData toCompositeData(StreamEvent.SessionCompleteEvent event)
+    {
+        Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ITEM_NAMES[0], event.planId.toString());
+        valueMap.put(ITEM_NAMES[1], event.peer.getHostAddress());
+        valueMap.put(ITEM_NAMES[2], event.success);
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
new file mode 100644
index 0000000..658facf
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+
+public class SessionInfoCompositeData
+{
+    private static final String[] ITEM_NAMES = new String[]{"planId",
+                                                            "peer",
+                                                            "receivingSummaries",
+                                                            "sendingSummaries",
+                                                            "state",
+                                                            "receivingFiles",
+                                                            "sendingFiles"};
+    private static final String[] ITEM_DESCS = new String[]{"Plan ID",
+                                                            "Session peer",
+                                                            "Summaries of receiving data",
+                                                            "Summaries of sending data",
+                                                            "Current session state",
+                                                            "Receiving files",
+                                                            "Sending files"};
+    private static final OpenType<?>[] ITEM_TYPES;
+
+    public static final CompositeType COMPOSITE_TYPE;
+    static  {
+        try
+        {
+            ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+                                        SimpleType.STRING,
+                                        ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
+                                        ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
+                                        SimpleType.STRING,
+                                        ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE),
+                                        ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE)};
+            COMPOSITE_TYPE = new CompositeType(SessionInfo.class.getName(),
+                                               "SessionInfo",
+                                               ITEM_NAMES,
+                                               ITEM_DESCS,
+                                               ITEM_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CompositeData toCompositeData(final UUID planId, SessionInfo sessionInfo)
+    {
+        Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ITEM_NAMES[0], planId.toString());
+        valueMap.put(ITEM_NAMES[1], sessionInfo.peer.getHostAddress());
+        Function<StreamSummary, CompositeData> fromStreamSummary = new Function<StreamSummary, CompositeData>()
+        {
+            public CompositeData apply(StreamSummary input)
+            {
+                return StreamSummaryCompositeData.toCompositeData(input);
+            }
+        };
+        valueMap.put(ITEM_NAMES[2], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary));
+        valueMap.put(ITEM_NAMES[3], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary));
+        valueMap.put(ITEM_NAMES[4], sessionInfo.state.name());
+        Function<ProgressInfo, CompositeData> fromProgressInfo = new Function<ProgressInfo, CompositeData>()
+        {
+            public CompositeData apply(ProgressInfo input)
+            {
+                return ProgressInfoCompositeData.toCompositeData(planId, input);
+            }
+        };
+        valueMap.put(ITEM_NAMES[5], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
+        valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static SessionInfo fromCompositeData(CompositeData cd)
+    {
+        assert cd.getCompositeType().equals(COMPOSITE_TYPE);
+
+        Object[] values = cd.getAll(ITEM_NAMES);
+        InetAddress peer;
+        try
+        {
+            peer = InetAddress.getByName((String) values[1]);
+        }
+        catch (UnknownHostException e)
+        {
+            throw Throwables.propagate(e);
+        }
+        Function<CompositeData, StreamSummary> toStreamSummary = new Function<CompositeData, StreamSummary>()
+        {
+            public StreamSummary apply(CompositeData input)
+            {
+                return StreamSummaryCompositeData.fromCompositeData(input);
+            }
+        };
+        SessionInfo info = new SessionInfo(peer,
+                                           fromArrayOfCompositeData((CompositeData[]) values[2], toStreamSummary),
+                                           fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
+                                           StreamSession.State.valueOf((String) values[4]));
+        Function<CompositeData, ProgressInfo> toProgressInfo = new Function<CompositeData, ProgressInfo>()
+        {
+            public ProgressInfo apply(CompositeData input)
+            {
+                return ProgressInfoCompositeData.fromCompositeData(input);
+            }
+        };
+        for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[5], toProgressInfo))
+        {
+            info.updateProgress(progress);
+        }
+        for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo))
+        {
+            info.updateProgress(progress);
+        }
+        return info;
+    }
+
+    private static <T> Collection<T> fromArrayOfCompositeData(CompositeData[] cds, Function<CompositeData, T> func)
+    {
+        return Lists.newArrayList(Iterables.transform(Arrays.asList(cds), func));
+    }
+
+    private static <T> CompositeData[] toArrayOfCompositeData(Collection<T> toConvert, Function<T, CompositeData> func)
+    {
+        CompositeData[] composites = new CompositeData[toConvert.size()];
+        return Lists.newArrayList(Iterables.transform(toConvert, func)).toArray(composites);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
new file mode 100644
index 0000000..f8c54ec
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.Notification;
+import javax.management.NotificationBroadcasterSupport;
+
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamManagerMBean;
+import org.apache.cassandra.streaming.StreamState;
+
+/**
+ */
+public class StreamEventJMXNotifier extends NotificationBroadcasterSupport implements StreamEventHandler
+{
+    private final AtomicLong seq = new AtomicLong();
+
+    public void handleStreamEvent(StreamEvent event)
+    {
+        Notification notif = null;
+        switch (event.eventType) {
+            case STREAM_PREPARED:
+                notif = new Notification(StreamEvent.SessionPreparedEvent.class.getCanonicalName(),
+                                                StreamManagerMBean.OBJECT_NAME,
+                                                seq.getAndIncrement());
+                notif.setUserData(SessionInfoCompositeData.toCompositeData(event.planId, ((StreamEvent.SessionPreparedEvent) event).session));
+                break;
+            case STREAM_COMPLETE:
+                notif = new Notification(StreamEvent.SessionCompleteEvent.class.getCanonicalName(),
+                                                StreamManagerMBean.OBJECT_NAME,
+                                                seq.getAndIncrement());
+                notif.setUserData(SessionCompleteEventCompositeData.toCompositeData((StreamEvent.SessionCompleteEvent) event));
+                break;
+            case FILE_PROGRESS:
+                notif = new Notification(StreamEvent.ProgressEvent.class.getCanonicalName(),
+                                         StreamManagerMBean.OBJECT_NAME,
+                                         seq.getAndIncrement());
+                notif.setUserData(ProgressInfoCompositeData.toCompositeData(event.planId, ((StreamEvent.ProgressEvent) event).progress));
+                break;
+        }
+        sendNotification(notif);
+    }
+
+    public void onSuccess(StreamState result)
+    {
+        Notification notif = new Notification(StreamEvent.class.getCanonicalName() + ".success",
+                                              StreamManagerMBean.OBJECT_NAME,
+                                              seq.getAndIncrement());
+        notif.setUserData(StreamStateCompositeData.toCompositeData(result));
+        sendNotification(notif);
+    }
+
+    public void onFailure(Throwable t)
+    {
+        Notification notif = new Notification(StreamEvent.class.getCanonicalName() + ".failure",
+                                              StreamManagerMBean.OBJECT_NAME,
+                                              seq.getAndIncrement());
+        notif.setUserData(t.fillInStackTrace().toString());
+        sendNotification(notif);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java b/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
new file mode 100644
index 0000000..820a71a
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.*;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamState;
+
+/**
+ */
+public class StreamStateCompositeData
+{
+    private static final String[] ITEM_NAMES = new String[]{"planId", "description", "sessions"};
+    private static final String[] ITEM_DESCS = new String[]{"Plan ID of this stream",
+                                                            "Stream plan description",
+                                                            "Active stream sessions"};
+    private static final OpenType<?>[] ITEM_TYPES;
+
+    public static final CompositeType COMPOSITE_TYPE;
+    static  {
+        try
+        {
+            ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+                                         SimpleType.STRING,
+                                         ArrayType.getArrayType(SessionInfoCompositeData.COMPOSITE_TYPE)};
+            COMPOSITE_TYPE = new CompositeType(StreamState.class.getName(),
+                                            "StreamState",
+                                            ITEM_NAMES,
+                                            ITEM_DESCS,
+                                            ITEM_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CompositeData toCompositeData(final StreamState streamState)
+    {
+        Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ITEM_NAMES[0], streamState.planId.toString());
+        valueMap.put(ITEM_NAMES[1], streamState.description);
+
+        CompositeData[] sessions = new CompositeData[streamState.sessions.size()];
+        Lists.newArrayList(Iterables.transform(streamState.sessions, new Function<SessionInfo, CompositeData>()
+        {
+            public CompositeData apply(SessionInfo input)
+            {
+                return SessionInfoCompositeData.toCompositeData(streamState.planId, input);
+            }
+        })).toArray(sessions);
+        valueMap.put(ITEM_NAMES[2], sessions);
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static StreamState fromCompositeData(CompositeData cd)
+    {
+        assert cd.getCompositeType().equals(COMPOSITE_TYPE);
+        Object[] values = cd.getAll(ITEM_NAMES);
+        UUID planId = UUID.fromString((String) values[0]);
+        String description = (String) values[1];
+        Set<SessionInfo> sessions = Sets.newHashSet(Iterables.transform(Arrays.asList((CompositeData[]) values[2]),
+                                                                        new Function<CompositeData, SessionInfo>()
+                                                                        {
+                                                                            public SessionInfo apply(CompositeData input)
+                                                                            {
+                                                                                return SessionInfoCompositeData.fromCompositeData(input);
+                                                                            }
+                                                                        }));
+        return new StreamState(planId, description, sessions);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java b/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
new file mode 100644
index 0000000..e93069c
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.streaming.StreamSummary;
+
+/**
+ */
+public class StreamSummaryCompositeData
+{
+    private static final String[] ITEM_NAMES = new String[]{"cfId",
+                                                            "files",
+                                                            "totalSize"};
+    private static final String[] ITEM_DESCS = new String[]{"ColumnFamilu ID",
+                                                            "Number of files",
+                                                            "Total bytes of the files"};
+    private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+                                                                   SimpleType.INTEGER,
+                                                                   SimpleType.LONG};
+
+    public static final CompositeType COMPOSITE_TYPE;
+    static  {
+        try
+        {
+            COMPOSITE_TYPE = new CompositeType(StreamSummary.class.getName(),
+                                               "StreamSummary",
+                                               ITEM_NAMES,
+                                               ITEM_DESCS,
+                                               ITEM_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CompositeData toCompositeData(StreamSummary streamSummary)
+    {
+        Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ITEM_NAMES[0], streamSummary.cfId.toString());
+        valueMap.put(ITEM_NAMES[1], streamSummary.files);
+        valueMap.put(ITEM_NAMES[2], streamSummary.totalSize);
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static StreamSummary fromCompositeData(CompositeData cd)
+    {
+        Object[] values = cd.getAll(ITEM_NAMES);
+        return new StreamSummary(UUID.fromString((String) values[0]),
+                                 (int) values[1],
+                                 (long) values[2]);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 13624a2..0da2944 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -32,11 +32,14 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Condition;
 import javax.management.*;
+import javax.management.openmbean.CompositeData;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
 
+import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
@@ -52,6 +55,7 @@ import org.apache.cassandra.net.MessagingServiceMBean;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.streaming.StreamManagerMBean;
+import org.apache.cassandra.streaming.management.StreamStateCompositeData;
 import org.apache.cassandra.utils.SimpleCondition;
 
 /**
@@ -547,7 +551,13 @@ public class NodeProbe
 
     public Set<StreamState> getStreamStatus()
     {
-        return streamProxy.getCurrentStreams();
+        return Sets.newHashSet(Iterables.transform(streamProxy.getCurrentStreams(), new Function<CompositeData, StreamState>()
+        {
+            public StreamState apply(CompositeData input)
+            {
+                return StreamStateCompositeData.fromCompositeData(input);
+            }
+        }));
     }
 
     public String getOperationMode()


[9/9] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-2.0' into trunk


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cb349d9e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cb349d9e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cb349d9e

Branch: refs/heads/trunk
Commit: cb349d9e7cb808796b1f65984c92ccae28e566d9
Parents: d9a64a4 839cc33
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Aug 12 11:12:39 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Aug 12 11:12:39 2013 -0500

----------------------------------------------------------------------
 .../cassandra/streaming/StreamManager.java      |  40 ++++-
 .../cassandra/streaming/StreamManagerMBean.java |   6 +-
 .../management/ProgressInfoCompositeData.java   | 103 ++++++++++++
 .../SessionCompleteEventCompositeData.java      |  71 ++++++++
 .../management/SessionInfoCompositeData.java    | 163 +++++++++++++++++++
 .../management/StreamEventJMXNotifier.java      |  88 ++++++++++
 .../management/StreamStateCompositeData.java    | 102 ++++++++++++
 .../management/StreamSummaryCompositeData.java  |  82 ++++++++++
 .../org/apache/cassandra/tools/NodeProbe.java   |  12 +-
 9 files changed, 660 insertions(+), 7 deletions(-)
----------------------------------------------------------------------



[4/9] git commit: suppress sending too many progress notifications

Posted by yu...@apache.org.
suppress sending too many progress notifications


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3479008
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3479008
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3479008

Branch: refs/heads/cassandra-2.0
Commit: a34790085ae7f79615f7225911ba9cf8ceb8f3e2
Parents: a0d6ed1
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Aug 12 11:07:53 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Aug 12 11:10:37 2013 -0500

----------------------------------------------------------------------
 .../management/StreamEventJMXNotifier.java      | 26 ++++++++++++++------
 1 file changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3479008/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
index f8c54ec..01ca9a1 100644
--- a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
+++ b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
@@ -21,17 +21,19 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.management.Notification;
 import javax.management.NotificationBroadcasterSupport;
 
-import org.apache.cassandra.streaming.StreamEvent;
-import org.apache.cassandra.streaming.StreamEventHandler;
-import org.apache.cassandra.streaming.StreamManagerMBean;
-import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.streaming.*;
 
 /**
  */
 public class StreamEventJMXNotifier extends NotificationBroadcasterSupport implements StreamEventHandler
 {
+    // interval in millisec to use for progress notification
+    private static final long PROGRESS_NOTIFICATION_INTERVAL = 1000;
+
     private final AtomicLong seq = new AtomicLong();
 
+    private long progressLastSent;
+
     public void handleStreamEvent(StreamEvent event)
     {
         Notification notif = null;
@@ -49,10 +51,18 @@ public class StreamEventJMXNotifier extends NotificationBroadcasterSupport imple
                 notif.setUserData(SessionCompleteEventCompositeData.toCompositeData((StreamEvent.SessionCompleteEvent) event));
                 break;
             case FILE_PROGRESS:
-                notif = new Notification(StreamEvent.ProgressEvent.class.getCanonicalName(),
-                                         StreamManagerMBean.OBJECT_NAME,
-                                         seq.getAndIncrement());
-                notif.setUserData(ProgressInfoCompositeData.toCompositeData(event.planId, ((StreamEvent.ProgressEvent) event).progress));
+                ProgressInfo progress = ((StreamEvent.ProgressEvent) event).progress;
+                long current = System.currentTimeMillis();
+                if (current - progressLastSent >= PROGRESS_NOTIFICATION_INTERVAL || progress.isCompleted())
+                {
+                    notif = new Notification(StreamEvent.ProgressEvent.class.getCanonicalName(),
+                                             StreamManagerMBean.OBJECT_NAME,
+                                             seq.getAndIncrement());
+                    notif.setUserData(ProgressInfoCompositeData.toCompositeData(event.planId, progress));
+                    progressLastSent = System.currentTimeMillis();
+                } else {
+                    return;
+                }
                 break;
         }
         sendNotification(notif);


[8/9] git commit: Merge branch 'cassandra-2.0.0' into cassandra-2.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.0.0' into cassandra-2.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/839cc330
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/839cc330
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/839cc330

Branch: refs/heads/cassandra-2.0
Commit: 839cc330712964783d5ef817cf62d54edda86e7b
Parents: b87270b a347900
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Aug 12 11:11:00 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Aug 12 11:11:00 2013 -0500

----------------------------------------------------------------------
 .../cassandra/streaming/StreamManager.java      |  40 ++++-
 .../cassandra/streaming/StreamManagerMBean.java |   6 +-
 .../management/ProgressInfoCompositeData.java   | 103 ++++++++++++
 .../SessionCompleteEventCompositeData.java      |  71 ++++++++
 .../management/SessionInfoCompositeData.java    | 163 +++++++++++++++++++
 .../management/StreamEventJMXNotifier.java      |  88 ++++++++++
 .../management/StreamStateCompositeData.java    | 102 ++++++++++++
 .../management/StreamSummaryCompositeData.java  |  82 ++++++++++
 .../org/apache/cassandra/tools/NodeProbe.java   |  12 +-
 9 files changed, 660 insertions(+), 7 deletions(-)
----------------------------------------------------------------------



[6/9] git commit: suppress sending too many progress notifications

Posted by yu...@apache.org.
suppress sending too many progress notifications


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a3479008
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a3479008
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a3479008

Branch: refs/heads/trunk
Commit: a34790085ae7f79615f7225911ba9cf8ceb8f3e2
Parents: a0d6ed1
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Aug 12 11:07:53 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Aug 12 11:10:37 2013 -0500

----------------------------------------------------------------------
 .../management/StreamEventJMXNotifier.java      | 26 ++++++++++++++------
 1 file changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a3479008/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
index f8c54ec..01ca9a1 100644
--- a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
+++ b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
@@ -21,17 +21,19 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.management.Notification;
 import javax.management.NotificationBroadcasterSupport;
 
-import org.apache.cassandra.streaming.StreamEvent;
-import org.apache.cassandra.streaming.StreamEventHandler;
-import org.apache.cassandra.streaming.StreamManagerMBean;
-import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.streaming.*;
 
 /**
  */
 public class StreamEventJMXNotifier extends NotificationBroadcasterSupport implements StreamEventHandler
 {
+    // interval in millisec to use for progress notification
+    private static final long PROGRESS_NOTIFICATION_INTERVAL = 1000;
+
     private final AtomicLong seq = new AtomicLong();
 
+    private long progressLastSent;
+
     public void handleStreamEvent(StreamEvent event)
     {
         Notification notif = null;
@@ -49,10 +51,18 @@ public class StreamEventJMXNotifier extends NotificationBroadcasterSupport imple
                 notif.setUserData(SessionCompleteEventCompositeData.toCompositeData((StreamEvent.SessionCompleteEvent) event));
                 break;
             case FILE_PROGRESS:
-                notif = new Notification(StreamEvent.ProgressEvent.class.getCanonicalName(),
-                                         StreamManagerMBean.OBJECT_NAME,
-                                         seq.getAndIncrement());
-                notif.setUserData(ProgressInfoCompositeData.toCompositeData(event.planId, ((StreamEvent.ProgressEvent) event).progress));
+                ProgressInfo progress = ((StreamEvent.ProgressEvent) event).progress;
+                long current = System.currentTimeMillis();
+                if (current - progressLastSent >= PROGRESS_NOTIFICATION_INTERVAL || progress.isCompleted())
+                {
+                    notif = new Notification(StreamEvent.ProgressEvent.class.getCanonicalName(),
+                                             StreamManagerMBean.OBJECT_NAME,
+                                             seq.getAndIncrement());
+                    notif.setUserData(ProgressInfoCompositeData.toCompositeData(event.planId, progress));
+                    progressLastSent = System.currentTimeMillis();
+                } else {
+                    return;
+                }
                 break;
         }
         sendNotification(notif);


[2/9] git commit: Improve JMX support for streaming

Posted by yu...@apache.org.
Improve JMX support for streaming

patch by yukim; reviewed by thobbs for CASSANDRA-5859


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a0d6ed12
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a0d6ed12
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a0d6ed12

Branch: refs/heads/cassandra-2.0.0
Commit: a0d6ed1290540673b0336418cbca0dd2f07e64a8
Parents: 09a4dc0
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Aug 8 14:30:16 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Aug 12 11:09:56 2013 -0500

----------------------------------------------------------------------
 .../cassandra/streaming/StreamManager.java      |  40 ++++-
 .../cassandra/streaming/StreamManagerMBean.java |   6 +-
 .../management/ProgressInfoCompositeData.java   | 103 ++++++++++++
 .../SessionCompleteEventCompositeData.java      |  71 ++++++++
 .../management/SessionInfoCompositeData.java    | 163 +++++++++++++++++++
 .../management/StreamEventJMXNotifier.java      |  78 +++++++++
 .../management/StreamStateCompositeData.java    | 102 ++++++++++++
 .../management/StreamSummaryCompositeData.java  |  82 ++++++++++
 .../org/apache/cassandra/tools/NodeProbe.java   |  12 +-
 9 files changed, 650 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index 5fc1c75..ccd0053 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -21,6 +21,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 
+import javax.management.ListenerNotFoundException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+import javax.management.openmbean.CompositeData;
+
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
@@ -29,6 +35,8 @@ import com.google.common.util.concurrent.RateLimiter;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.streaming.management.StreamEventJMXNotifier;
+import org.apache.cassandra.streaming.management.StreamStateCompositeData;
 
 /**
  * StreamManager manages currently running {@link StreamResultFuture}s and provides status of all operation invoked.
@@ -60,6 +68,8 @@ public class StreamManager implements StreamManagerMBean
         return limiter;
     }
 
+    private final StreamEventJMXNotifier notifier = new StreamEventJMXNotifier();
+
     /*
      * Currently running streams. Removed after completion/failure.
      * We manage them in two different maps to distinguish plan from initiated ones to
@@ -68,19 +78,20 @@ public class StreamManager implements StreamManagerMBean
     private final Map<UUID, StreamResultFuture> initiatedStreams = new NonBlockingHashMap<>();
     private final Map<UUID, StreamResultFuture> receivingStreams = new NonBlockingHashMap<>();
 
-    public Set<StreamState> getCurrentStreams()
+    public Set<CompositeData> getCurrentStreams()
     {
-        return Sets.newHashSet(Iterables.transform(Iterables.concat(initiatedStreams.values(), receivingStreams.values()), new Function<StreamResultFuture, StreamState>()
+        return Sets.newHashSet(Iterables.transform(Iterables.concat(initiatedStreams.values(), receivingStreams.values()), new Function<StreamResultFuture, CompositeData>()
         {
-            public StreamState apply(StreamResultFuture input)
+            public CompositeData apply(StreamResultFuture input)
             {
-                return input.getCurrentState();
+                return StreamStateCompositeData.toCompositeData(input.getCurrentState());
             }
         }));
     }
 
     public void register(final StreamResultFuture result)
     {
+        result.addEventListener(notifier);
         // Make sure we remove the stream on completion (whether successful or not)
         result.addListener(new Runnable()
         {
@@ -95,6 +106,7 @@ public class StreamManager implements StreamManagerMBean
 
     public void registerReceiving(final StreamResultFuture result)
     {
+        result.addEventListener(notifier);
         // Make sure we remove the stream on completion (whether successful or not)
         result.addListener(new Runnable()
         {
@@ -111,4 +123,24 @@ public class StreamManager implements StreamManagerMBean
     {
         return receivingStreams.get(planId);
     }
+
+    public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback)
+    {
+        notifier.addNotificationListener(listener, filter, handback);
+    }
+
+    public void removeNotificationListener(NotificationListener listener) throws ListenerNotFoundException
+    {
+        notifier.removeNotificationListener(listener);
+    }
+
+    public void removeNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) throws ListenerNotFoundException
+    {
+        notifier.removeNotificationListener(listener, filter, handback);
+    }
+
+    public MBeanNotificationInfo[] getNotificationInfo()
+    {
+        return notifier.getNotificationInfo();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
index f338fb5..f329596 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
@@ -18,13 +18,15 @@
 package org.apache.cassandra.streaming;
 
 import java.util.Set;
+import javax.management.NotificationEmitter;
+import javax.management.openmbean.CompositeData;
 
-public interface StreamManagerMBean
+public interface StreamManagerMBean extends NotificationEmitter
 {
     public static final String OBJECT_NAME = "org.apache.cassandra.net:type=StreamManager";
 
     /**
      * Returns the current state of all ongoing streams.
      */
-    Set<StreamState> getCurrentStreams();
+    Set<CompositeData> getCurrentStreams();
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
new file mode 100644
index 0000000..b361b1b
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.streaming.ProgressInfo;
+
+public class ProgressInfoCompositeData
+{
+    private static final String[] ITEM_NAMES = new String[]{"planId",
+                                                            "peer",
+                                                            "fileName",
+                                                            "direction",
+                                                            "currentBytes",
+                                                            "totalBytes"};
+    private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID",
+                                                            "Session peer",
+                                                            "Name of the file",
+                                                            "Direction('IN' or 'OUT')",
+                                                            "Current bytes transferred",
+                                                            "Total bytes to transfer"};
+    private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+                                                                   SimpleType.STRING,
+                                                                   SimpleType.STRING,
+                                                                   SimpleType.STRING,
+                                                                   SimpleType.LONG,
+                                                                   SimpleType.LONG};
+
+    public static final CompositeType COMPOSITE_TYPE;
+    static  {
+        try
+        {
+            COMPOSITE_TYPE = new CompositeType(ProgressInfo.class.getName(),
+                                               "ProgressInfo",
+                                               ITEM_NAMES,
+                                               ITEM_DESCS,
+                                               ITEM_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CompositeData toCompositeData(UUID planId, ProgressInfo progressInfo)
+    {
+        Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ITEM_NAMES[0], planId.toString());
+        valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress());
+        valueMap.put(ITEM_NAMES[2], progressInfo.fileName);
+        valueMap.put(ITEM_NAMES[3], progressInfo.direction.name());
+        valueMap.put(ITEM_NAMES[4], progressInfo.currentBytes);
+        valueMap.put(ITEM_NAMES[5], progressInfo.totalBytes);
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static ProgressInfo fromCompositeData(CompositeData cd)
+    {
+        Object[] values = cd.getAll(ITEM_NAMES);
+        try
+        {
+            return new ProgressInfo(InetAddress.getByName((String) values[1]),
+                                    (String) values[2],
+                                    ProgressInfo.Direction.valueOf((String)values[3]),
+                                    (long) values[4],
+                                    (long) values[5]);
+        }
+        catch (UnknownHostException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
new file mode 100644
index 0000000..3351e6e
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.streaming.StreamEvent;
+
+public class SessionCompleteEventCompositeData
+{
+    private static final String[] ITEM_NAMES = new String[]{"planId",
+                                                            "peer",
+                                                            "success"};
+    private static final String[] ITEM_DESCS = new String[]{"Plan ID",
+                                                            "Session peer",
+                                                            "Indicates whether session was successful"};
+    private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+                                                                   SimpleType.STRING,
+                                                                   SimpleType.BOOLEAN};
+
+    public static final CompositeType COMPOSITE_TYPE;
+    static  {
+        try
+        {
+            COMPOSITE_TYPE = new CompositeType(StreamEvent.SessionCompleteEvent.class.getName(),
+                                               "SessionCompleteEvent",
+                                               ITEM_NAMES,
+                                               ITEM_DESCS,
+                                               ITEM_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CompositeData toCompositeData(StreamEvent.SessionCompleteEvent event)
+    {
+        Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ITEM_NAMES[0], event.planId.toString());
+        valueMap.put(ITEM_NAMES[1], event.peer.getHostAddress());
+        valueMap.put(ITEM_NAMES[2], event.success);
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
new file mode 100644
index 0000000..658facf
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+
+public class SessionInfoCompositeData
+{
+    private static final String[] ITEM_NAMES = new String[]{"planId",
+                                                            "peer",
+                                                            "receivingSummaries",
+                                                            "sendingSummaries",
+                                                            "state",
+                                                            "receivingFiles",
+                                                            "sendingFiles"};
+    private static final String[] ITEM_DESCS = new String[]{"Plan ID",
+                                                            "Session peer",
+                                                            "Summaries of receiving data",
+                                                            "Summaries of sending data",
+                                                            "Current session state",
+                                                            "Receiving files",
+                                                            "Sending files"};
+    private static final OpenType<?>[] ITEM_TYPES;
+
+    public static final CompositeType COMPOSITE_TYPE;
+    static  {
+        try
+        {
+            ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+                                        SimpleType.STRING,
+                                        ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
+                                        ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
+                                        SimpleType.STRING,
+                                        ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE),
+                                        ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE)};
+            COMPOSITE_TYPE = new CompositeType(SessionInfo.class.getName(),
+                                               "SessionInfo",
+                                               ITEM_NAMES,
+                                               ITEM_DESCS,
+                                               ITEM_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CompositeData toCompositeData(final UUID planId, SessionInfo sessionInfo)
+    {
+        Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ITEM_NAMES[0], planId.toString());
+        valueMap.put(ITEM_NAMES[1], sessionInfo.peer.getHostAddress());
+        Function<StreamSummary, CompositeData> fromStreamSummary = new Function<StreamSummary, CompositeData>()
+        {
+            public CompositeData apply(StreamSummary input)
+            {
+                return StreamSummaryCompositeData.toCompositeData(input);
+            }
+        };
+        valueMap.put(ITEM_NAMES[2], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary));
+        valueMap.put(ITEM_NAMES[3], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary));
+        valueMap.put(ITEM_NAMES[4], sessionInfo.state.name());
+        Function<ProgressInfo, CompositeData> fromProgressInfo = new Function<ProgressInfo, CompositeData>()
+        {
+            public CompositeData apply(ProgressInfo input)
+            {
+                return ProgressInfoCompositeData.toCompositeData(planId, input);
+            }
+        };
+        valueMap.put(ITEM_NAMES[5], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
+        valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static SessionInfo fromCompositeData(CompositeData cd)
+    {
+        assert cd.getCompositeType().equals(COMPOSITE_TYPE);
+
+        Object[] values = cd.getAll(ITEM_NAMES);
+        InetAddress peer;
+        try
+        {
+            peer = InetAddress.getByName((String) values[1]);
+        }
+        catch (UnknownHostException e)
+        {
+            throw Throwables.propagate(e);
+        }
+        Function<CompositeData, StreamSummary> toStreamSummary = new Function<CompositeData, StreamSummary>()
+        {
+            public StreamSummary apply(CompositeData input)
+            {
+                return StreamSummaryCompositeData.fromCompositeData(input);
+            }
+        };
+        SessionInfo info = new SessionInfo(peer,
+                                           fromArrayOfCompositeData((CompositeData[]) values[2], toStreamSummary),
+                                           fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
+                                           StreamSession.State.valueOf((String) values[4]));
+        Function<CompositeData, ProgressInfo> toProgressInfo = new Function<CompositeData, ProgressInfo>()
+        {
+            public ProgressInfo apply(CompositeData input)
+            {
+                return ProgressInfoCompositeData.fromCompositeData(input);
+            }
+        };
+        for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[5], toProgressInfo))
+        {
+            info.updateProgress(progress);
+        }
+        for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo))
+        {
+            info.updateProgress(progress);
+        }
+        return info;
+    }
+
+    private static <T> Collection<T> fromArrayOfCompositeData(CompositeData[] cds, Function<CompositeData, T> func)
+    {
+        return Lists.newArrayList(Iterables.transform(Arrays.asList(cds), func));
+    }
+
+    private static <T> CompositeData[] toArrayOfCompositeData(Collection<T> toConvert, Function<T, CompositeData> func)
+    {
+        CompositeData[] composites = new CompositeData[toConvert.size()];
+        return Lists.newArrayList(Iterables.transform(toConvert, func)).toArray(composites);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
new file mode 100644
index 0000000..f8c54ec
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.Notification;
+import javax.management.NotificationBroadcasterSupport;
+
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamManagerMBean;
+import org.apache.cassandra.streaming.StreamState;
+
+/**
+ */
+public class StreamEventJMXNotifier extends NotificationBroadcasterSupport implements StreamEventHandler
+{
+    private final AtomicLong seq = new AtomicLong();
+
+    public void handleStreamEvent(StreamEvent event)
+    {
+        Notification notif = null;
+        switch (event.eventType) {
+            case STREAM_PREPARED:
+                notif = new Notification(StreamEvent.SessionPreparedEvent.class.getCanonicalName(),
+                                                StreamManagerMBean.OBJECT_NAME,
+                                                seq.getAndIncrement());
+                notif.setUserData(SessionInfoCompositeData.toCompositeData(event.planId, ((StreamEvent.SessionPreparedEvent) event).session));
+                break;
+            case STREAM_COMPLETE:
+                notif = new Notification(StreamEvent.SessionCompleteEvent.class.getCanonicalName(),
+                                                StreamManagerMBean.OBJECT_NAME,
+                                                seq.getAndIncrement());
+                notif.setUserData(SessionCompleteEventCompositeData.toCompositeData((StreamEvent.SessionCompleteEvent) event));
+                break;
+            case FILE_PROGRESS:
+                notif = new Notification(StreamEvent.ProgressEvent.class.getCanonicalName(),
+                                         StreamManagerMBean.OBJECT_NAME,
+                                         seq.getAndIncrement());
+                notif.setUserData(ProgressInfoCompositeData.toCompositeData(event.planId, ((StreamEvent.ProgressEvent) event).progress));
+                break;
+        }
+        sendNotification(notif);
+    }
+
+    public void onSuccess(StreamState result)
+    {
+        Notification notif = new Notification(StreamEvent.class.getCanonicalName() + ".success",
+                                              StreamManagerMBean.OBJECT_NAME,
+                                              seq.getAndIncrement());
+        notif.setUserData(StreamStateCompositeData.toCompositeData(result));
+        sendNotification(notif);
+    }
+
+    public void onFailure(Throwable t)
+    {
+        Notification notif = new Notification(StreamEvent.class.getCanonicalName() + ".failure",
+                                              StreamManagerMBean.OBJECT_NAME,
+                                              seq.getAndIncrement());
+        notif.setUserData(t.fillInStackTrace().toString());
+        sendNotification(notif);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java b/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
new file mode 100644
index 0000000..820a71a
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.*;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamState;
+
+/**
+ */
+public class StreamStateCompositeData
+{
+    private static final String[] ITEM_NAMES = new String[]{"planId", "description", "sessions"};
+    private static final String[] ITEM_DESCS = new String[]{"Plan ID of this stream",
+                                                            "Stream plan description",
+                                                            "Active stream sessions"};
+    private static final OpenType<?>[] ITEM_TYPES;
+
+    public static final CompositeType COMPOSITE_TYPE;
+    static  {
+        try
+        {
+            ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+                                         SimpleType.STRING,
+                                         ArrayType.getArrayType(SessionInfoCompositeData.COMPOSITE_TYPE)};
+            COMPOSITE_TYPE = new CompositeType(StreamState.class.getName(),
+                                            "StreamState",
+                                            ITEM_NAMES,
+                                            ITEM_DESCS,
+                                            ITEM_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CompositeData toCompositeData(final StreamState streamState)
+    {
+        Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ITEM_NAMES[0], streamState.planId.toString());
+        valueMap.put(ITEM_NAMES[1], streamState.description);
+
+        CompositeData[] sessions = new CompositeData[streamState.sessions.size()];
+        Lists.newArrayList(Iterables.transform(streamState.sessions, new Function<SessionInfo, CompositeData>()
+        {
+            public CompositeData apply(SessionInfo input)
+            {
+                return SessionInfoCompositeData.toCompositeData(streamState.planId, input);
+            }
+        })).toArray(sessions);
+        valueMap.put(ITEM_NAMES[2], sessions);
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static StreamState fromCompositeData(CompositeData cd)
+    {
+        assert cd.getCompositeType().equals(COMPOSITE_TYPE);
+        Object[] values = cd.getAll(ITEM_NAMES);
+        UUID planId = UUID.fromString((String) values[0]);
+        String description = (String) values[1];
+        Set<SessionInfo> sessions = Sets.newHashSet(Iterables.transform(Arrays.asList((CompositeData[]) values[2]),
+                                                                        new Function<CompositeData, SessionInfo>()
+                                                                        {
+                                                                            public SessionInfo apply(CompositeData input)
+                                                                            {
+                                                                                return SessionInfoCompositeData.fromCompositeData(input);
+                                                                            }
+                                                                        }));
+        return new StreamState(planId, description, sessions);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java b/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
new file mode 100644
index 0000000..e93069c
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.streaming.StreamSummary;
+
+/**
+ */
+public class StreamSummaryCompositeData
+{
+    private static final String[] ITEM_NAMES = new String[]{"cfId",
+                                                            "files",
+                                                            "totalSize"};
+    private static final String[] ITEM_DESCS = new String[]{"ColumnFamilu ID",
+                                                            "Number of files",
+                                                            "Total bytes of the files"};
+    private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+                                                                   SimpleType.INTEGER,
+                                                                   SimpleType.LONG};
+
+    public static final CompositeType COMPOSITE_TYPE;
+    static  {
+        try
+        {
+            COMPOSITE_TYPE = new CompositeType(StreamSummary.class.getName(),
+                                               "StreamSummary",
+                                               ITEM_NAMES,
+                                               ITEM_DESCS,
+                                               ITEM_TYPES);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static CompositeData toCompositeData(StreamSummary streamSummary)
+    {
+        Map<String, Object> valueMap = new HashMap<>();
+        valueMap.put(ITEM_NAMES[0], streamSummary.cfId.toString());
+        valueMap.put(ITEM_NAMES[1], streamSummary.files);
+        valueMap.put(ITEM_NAMES[2], streamSummary.totalSize);
+        try
+        {
+            return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+        }
+        catch (OpenDataException e)
+        {
+            throw Throwables.propagate(e);
+        }
+    }
+
+    public static StreamSummary fromCompositeData(CompositeData cd)
+    {
+        Object[] values = cd.getAll(ITEM_NAMES);
+        return new StreamSummary(UUID.fromString((String) values[0]),
+                                 (int) values[1],
+                                 (long) values[2]);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 13624a2..0da2944 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -32,11 +32,14 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.Condition;
 import javax.management.*;
+import javax.management.openmbean.CompositeData;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
 
+import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 
 import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
 import org.apache.cassandra.db.ColumnFamilyStoreMBean;
@@ -52,6 +55,7 @@ import org.apache.cassandra.net.MessagingServiceMBean;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.streaming.StreamManagerMBean;
+import org.apache.cassandra.streaming.management.StreamStateCompositeData;
 import org.apache.cassandra.utils.SimpleCondition;
 
 /**
@@ -547,7 +551,13 @@ public class NodeProbe
 
     public Set<StreamState> getStreamStatus()
     {
-        return streamProxy.getCurrentStreams();
+        return Sets.newHashSet(Iterables.transform(streamProxy.getCurrentStreams(), new Function<CompositeData, StreamState>()
+        {
+            public StreamState apply(CompositeData input)
+            {
+                return StreamStateCompositeData.fromCompositeData(input);
+            }
+        }));
     }
 
     public String getOperationMode()


[7/9] git commit: Merge branch 'cassandra-2.0.0' into cassandra-2.0

Posted by yu...@apache.org.
Merge branch 'cassandra-2.0.0' into cassandra-2.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/839cc330
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/839cc330
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/839cc330

Branch: refs/heads/trunk
Commit: 839cc330712964783d5ef817cf62d54edda86e7b
Parents: b87270b a347900
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Aug 12 11:11:00 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Aug 12 11:11:00 2013 -0500

----------------------------------------------------------------------
 .../cassandra/streaming/StreamManager.java      |  40 ++++-
 .../cassandra/streaming/StreamManagerMBean.java |   6 +-
 .../management/ProgressInfoCompositeData.java   | 103 ++++++++++++
 .../SessionCompleteEventCompositeData.java      |  71 ++++++++
 .../management/SessionInfoCompositeData.java    | 163 +++++++++++++++++++
 .../management/StreamEventJMXNotifier.java      |  88 ++++++++++
 .../management/StreamStateCompositeData.java    | 102 ++++++++++++
 .../management/StreamSummaryCompositeData.java  |  82 ++++++++++
 .../org/apache/cassandra/tools/NodeProbe.java   |  12 +-
 9 files changed, 660 insertions(+), 7 deletions(-)
----------------------------------------------------------------------