You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/08/12 18:14:16 UTC
[2/9] git commit: Improve JMX support for streaming
Improve JMX support for streaming
patch by yukim; reviewed by thobbs for CASSANDRA-5859
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a0d6ed12
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a0d6ed12
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a0d6ed12
Branch: refs/heads/cassandra-2.0.0
Commit: a0d6ed1290540673b0336418cbca0dd2f07e64a8
Parents: 09a4dc0
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Aug 8 14:30:16 2013 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Aug 12 11:09:56 2013 -0500
----------------------------------------------------------------------
.../cassandra/streaming/StreamManager.java | 40 ++++-
.../cassandra/streaming/StreamManagerMBean.java | 6 +-
.../management/ProgressInfoCompositeData.java | 103 ++++++++++++
.../SessionCompleteEventCompositeData.java | 71 ++++++++
.../management/SessionInfoCompositeData.java | 163 +++++++++++++++++++
.../management/StreamEventJMXNotifier.java | 78 +++++++++
.../management/StreamStateCompositeData.java | 102 ++++++++++++
.../management/StreamSummaryCompositeData.java | 82 ++++++++++
.../org/apache/cassandra/tools/NodeProbe.java | 12 +-
9 files changed, 650 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java
index 5fc1c75..ccd0053 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManager.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManager.java
@@ -21,6 +21,12 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import javax.management.ListenerNotFoundException;
+import javax.management.MBeanNotificationInfo;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+import javax.management.openmbean.CompositeData;
+
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@@ -29,6 +35,8 @@ import com.google.common.util.concurrent.RateLimiter;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.streaming.management.StreamEventJMXNotifier;
+import org.apache.cassandra.streaming.management.StreamStateCompositeData;
/**
* StreamManager manages currently running {@link StreamResultFuture}s and provides status of all operation invoked.
@@ -60,6 +68,8 @@ public class StreamManager implements StreamManagerMBean
return limiter;
}
+ private final StreamEventJMXNotifier notifier = new StreamEventJMXNotifier();
+
/*
* Currently running streams. Removed after completion/failure.
* We manage them in two different maps to distinguish plan from initiated ones to
@@ -68,19 +78,20 @@ public class StreamManager implements StreamManagerMBean
private final Map<UUID, StreamResultFuture> initiatedStreams = new NonBlockingHashMap<>();
private final Map<UUID, StreamResultFuture> receivingStreams = new NonBlockingHashMap<>();
- public Set<StreamState> getCurrentStreams()
+ public Set<CompositeData> getCurrentStreams()
{
- return Sets.newHashSet(Iterables.transform(Iterables.concat(initiatedStreams.values(), receivingStreams.values()), new Function<StreamResultFuture, StreamState>()
+ return Sets.newHashSet(Iterables.transform(Iterables.concat(initiatedStreams.values(), receivingStreams.values()), new Function<StreamResultFuture, CompositeData>()
{
- public StreamState apply(StreamResultFuture input)
+ public CompositeData apply(StreamResultFuture input)
{
- return input.getCurrentState();
+ return StreamStateCompositeData.toCompositeData(input.getCurrentState());
}
}));
}
public void register(final StreamResultFuture result)
{
+ result.addEventListener(notifier);
// Make sure we remove the stream on completion (whether successful or not)
result.addListener(new Runnable()
{
@@ -95,6 +106,7 @@ public class StreamManager implements StreamManagerMBean
public void registerReceiving(final StreamResultFuture result)
{
+ result.addEventListener(notifier);
// Make sure we remove the stream on completion (whether successful or not)
result.addListener(new Runnable()
{
@@ -111,4 +123,24 @@ public class StreamManager implements StreamManagerMBean
{
return receivingStreams.get(planId);
}
+
+ public void addNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback)
+ {
+ notifier.addNotificationListener(listener, filter, handback);
+ }
+
+ public void removeNotificationListener(NotificationListener listener) throws ListenerNotFoundException
+ {
+ notifier.removeNotificationListener(listener);
+ }
+
+ public void removeNotificationListener(NotificationListener listener, NotificationFilter filter, Object handback) throws ListenerNotFoundException
+ {
+ notifier.removeNotificationListener(listener, filter, handback);
+ }
+
+ public MBeanNotificationInfo[] getNotificationInfo()
+ {
+ return notifier.getNotificationInfo();
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
index f338fb5..f329596 100644
--- a/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
+++ b/src/java/org/apache/cassandra/streaming/StreamManagerMBean.java
@@ -18,13 +18,15 @@
package org.apache.cassandra.streaming;
import java.util.Set;
+import javax.management.NotificationEmitter;
+import javax.management.openmbean.CompositeData;
-public interface StreamManagerMBean
+public interface StreamManagerMBean extends NotificationEmitter
{
public static final String OBJECT_NAME = "org.apache.cassandra.net:type=StreamManager";
/**
* Returns the current state of all ongoing streams.
*/
- Set<StreamState> getCurrentStreams();
+ Set<CompositeData> getCurrentStreams();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
new file mode 100644
index 0000000..b361b1b
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/ProgressInfoCompositeData.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.streaming.ProgressInfo;
+
+public class ProgressInfoCompositeData
+{
+ private static final String[] ITEM_NAMES = new String[]{"planId",
+ "peer",
+ "fileName",
+ "direction",
+ "currentBytes",
+ "totalBytes"};
+ private static final String[] ITEM_DESCS = new String[]{"String representation of Plan ID",
+ "Session peer",
+ "Name of the file",
+ "Direction('IN' or 'OUT')",
+ "Current bytes transferred",
+ "Total bytes to transfer"};
+ private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+ SimpleType.STRING,
+ SimpleType.STRING,
+ SimpleType.STRING,
+ SimpleType.LONG,
+ SimpleType.LONG};
+
+ public static final CompositeType COMPOSITE_TYPE;
+ static {
+ try
+ {
+ COMPOSITE_TYPE = new CompositeType(ProgressInfo.class.getName(),
+ "ProgressInfo",
+ ITEM_NAMES,
+ ITEM_DESCS,
+ ITEM_TYPES);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public static CompositeData toCompositeData(UUID planId, ProgressInfo progressInfo)
+ {
+ Map<String, Object> valueMap = new HashMap<>();
+ valueMap.put(ITEM_NAMES[0], planId.toString());
+ valueMap.put(ITEM_NAMES[1], progressInfo.peer.getHostAddress());
+ valueMap.put(ITEM_NAMES[2], progressInfo.fileName);
+ valueMap.put(ITEM_NAMES[3], progressInfo.direction.name());
+ valueMap.put(ITEM_NAMES[4], progressInfo.currentBytes);
+ valueMap.put(ITEM_NAMES[5], progressInfo.totalBytes);
+ try
+ {
+ return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public static ProgressInfo fromCompositeData(CompositeData cd)
+ {
+ Object[] values = cd.getAll(ITEM_NAMES);
+ try
+ {
+ return new ProgressInfo(InetAddress.getByName((String) values[1]),
+ (String) values[2],
+ ProgressInfo.Direction.valueOf((String)values[3]),
+ (long) values[4],
+ (long) values[5]);
+ }
+ catch (UnknownHostException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
new file mode 100644
index 0000000..3351e6e
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/SessionCompleteEventCompositeData.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.streaming.StreamEvent;
+
+public class SessionCompleteEventCompositeData
+{
+ private static final String[] ITEM_NAMES = new String[]{"planId",
+ "peer",
+ "success"};
+ private static final String[] ITEM_DESCS = new String[]{"Plan ID",
+ "Session peer",
+ "Indicates whether session was successful"};
+ private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+ SimpleType.STRING,
+ SimpleType.BOOLEAN};
+
+ public static final CompositeType COMPOSITE_TYPE;
+ static {
+ try
+ {
+ COMPOSITE_TYPE = new CompositeType(StreamEvent.SessionCompleteEvent.class.getName(),
+ "SessionCompleteEvent",
+ ITEM_NAMES,
+ ITEM_DESCS,
+ ITEM_TYPES);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public static CompositeData toCompositeData(StreamEvent.SessionCompleteEvent event)
+ {
+ Map<String, Object> valueMap = new HashMap<>();
+ valueMap.put(ITEM_NAMES[0], event.planId.toString());
+ valueMap.put(ITEM_NAMES[1], event.peer.getHostAddress());
+ valueMap.put(ITEM_NAMES[2], event.success);
+ try
+ {
+ return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
new file mode 100644
index 0000000..658facf
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.streaming.ProgressInfo;
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamSession;
+import org.apache.cassandra.streaming.StreamSummary;
+
+public class SessionInfoCompositeData
+{
+ private static final String[] ITEM_NAMES = new String[]{"planId",
+ "peer",
+ "receivingSummaries",
+ "sendingSummaries",
+ "state",
+ "receivingFiles",
+ "sendingFiles"};
+ private static final String[] ITEM_DESCS = new String[]{"Plan ID",
+ "Session peer",
+ "Summaries of receiving data",
+ "Summaries of sending data",
+ "Current session state",
+ "Receiving files",
+ "Sending files"};
+ private static final OpenType<?>[] ITEM_TYPES;
+
+ public static final CompositeType COMPOSITE_TYPE;
+ static {
+ try
+ {
+ ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+ SimpleType.STRING,
+ ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
+ ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
+ SimpleType.STRING,
+ ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE),
+ ArrayType.getArrayType(ProgressInfoCompositeData.COMPOSITE_TYPE)};
+ COMPOSITE_TYPE = new CompositeType(SessionInfo.class.getName(),
+ "SessionInfo",
+ ITEM_NAMES,
+ ITEM_DESCS,
+ ITEM_TYPES);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public static CompositeData toCompositeData(final UUID planId, SessionInfo sessionInfo)
+ {
+ Map<String, Object> valueMap = new HashMap<>();
+ valueMap.put(ITEM_NAMES[0], planId.toString());
+ valueMap.put(ITEM_NAMES[1], sessionInfo.peer.getHostAddress());
+ Function<StreamSummary, CompositeData> fromStreamSummary = new Function<StreamSummary, CompositeData>()
+ {
+ public CompositeData apply(StreamSummary input)
+ {
+ return StreamSummaryCompositeData.toCompositeData(input);
+ }
+ };
+ valueMap.put(ITEM_NAMES[2], toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary));
+ valueMap.put(ITEM_NAMES[3], toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary));
+ valueMap.put(ITEM_NAMES[4], sessionInfo.state.name());
+ Function<ProgressInfo, CompositeData> fromProgressInfo = new Function<ProgressInfo, CompositeData>()
+ {
+ public CompositeData apply(ProgressInfo input)
+ {
+ return ProgressInfoCompositeData.toCompositeData(planId, input);
+ }
+ };
+ valueMap.put(ITEM_NAMES[5], toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
+ valueMap.put(ITEM_NAMES[6], toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
+ try
+ {
+ return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public static SessionInfo fromCompositeData(CompositeData cd)
+ {
+ assert cd.getCompositeType().equals(COMPOSITE_TYPE);
+
+ Object[] values = cd.getAll(ITEM_NAMES);
+ InetAddress peer;
+ try
+ {
+ peer = InetAddress.getByName((String) values[1]);
+ }
+ catch (UnknownHostException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ Function<CompositeData, StreamSummary> toStreamSummary = new Function<CompositeData, StreamSummary>()
+ {
+ public StreamSummary apply(CompositeData input)
+ {
+ return StreamSummaryCompositeData.fromCompositeData(input);
+ }
+ };
+ SessionInfo info = new SessionInfo(peer,
+ fromArrayOfCompositeData((CompositeData[]) values[2], toStreamSummary),
+ fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
+ StreamSession.State.valueOf((String) values[4]));
+ Function<CompositeData, ProgressInfo> toProgressInfo = new Function<CompositeData, ProgressInfo>()
+ {
+ public ProgressInfo apply(CompositeData input)
+ {
+ return ProgressInfoCompositeData.fromCompositeData(input);
+ }
+ };
+ for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[5], toProgressInfo))
+ {
+ info.updateProgress(progress);
+ }
+ for (ProgressInfo progress : fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo))
+ {
+ info.updateProgress(progress);
+ }
+ return info;
+ }
+
+ private static <T> Collection<T> fromArrayOfCompositeData(CompositeData[] cds, Function<CompositeData, T> func)
+ {
+ return Lists.newArrayList(Iterables.transform(Arrays.asList(cds), func));
+ }
+
+ private static <T> CompositeData[] toArrayOfCompositeData(Collection<T> toConvert, Function<T, CompositeData> func)
+ {
+ CompositeData[] composites = new CompositeData[toConvert.size()];
+ return Lists.newArrayList(Iterables.transform(toConvert, func)).toArray(composites);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
new file mode 100644
index 0000000..f8c54ec
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/StreamEventJMXNotifier.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.concurrent.atomic.AtomicLong;
+import javax.management.Notification;
+import javax.management.NotificationBroadcasterSupport;
+
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamManagerMBean;
+import org.apache.cassandra.streaming.StreamState;
+
+/**
+ */
+public class StreamEventJMXNotifier extends NotificationBroadcasterSupport implements StreamEventHandler
+{
+ private final AtomicLong seq = new AtomicLong();
+
+ public void handleStreamEvent(StreamEvent event)
+ {
+ Notification notif = null;
+ switch (event.eventType) {
+ case STREAM_PREPARED:
+ notif = new Notification(StreamEvent.SessionPreparedEvent.class.getCanonicalName(),
+ StreamManagerMBean.OBJECT_NAME,
+ seq.getAndIncrement());
+ notif.setUserData(SessionInfoCompositeData.toCompositeData(event.planId, ((StreamEvent.SessionPreparedEvent) event).session));
+ break;
+ case STREAM_COMPLETE:
+ notif = new Notification(StreamEvent.SessionCompleteEvent.class.getCanonicalName(),
+ StreamManagerMBean.OBJECT_NAME,
+ seq.getAndIncrement());
+ notif.setUserData(SessionCompleteEventCompositeData.toCompositeData((StreamEvent.SessionCompleteEvent) event));
+ break;
+ case FILE_PROGRESS:
+ notif = new Notification(StreamEvent.ProgressEvent.class.getCanonicalName(),
+ StreamManagerMBean.OBJECT_NAME,
+ seq.getAndIncrement());
+ notif.setUserData(ProgressInfoCompositeData.toCompositeData(event.planId, ((StreamEvent.ProgressEvent) event).progress));
+ break;
+ }
+ sendNotification(notif);
+ }
+
+ public void onSuccess(StreamState result)
+ {
+ Notification notif = new Notification(StreamEvent.class.getCanonicalName() + ".success",
+ StreamManagerMBean.OBJECT_NAME,
+ seq.getAndIncrement());
+ notif.setUserData(StreamStateCompositeData.toCompositeData(result));
+ sendNotification(notif);
+ }
+
+ public void onFailure(Throwable t)
+ {
+ Notification notif = new Notification(StreamEvent.class.getCanonicalName() + ".failure",
+ StreamManagerMBean.OBJECT_NAME,
+ seq.getAndIncrement());
+ notif.setUserData(t.fillInStackTrace().toString());
+ sendNotification(notif);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java b/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
new file mode 100644
index 0000000..820a71a
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/StreamStateCompositeData.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.*;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Function;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.streaming.SessionInfo;
+import org.apache.cassandra.streaming.StreamState;
+
+/**
+ */
+public class StreamStateCompositeData
+{
+ private static final String[] ITEM_NAMES = new String[]{"planId", "description", "sessions"};
+ private static final String[] ITEM_DESCS = new String[]{"Plan ID of this stream",
+ "Stream plan description",
+ "Active stream sessions"};
+ private static final OpenType<?>[] ITEM_TYPES;
+
+ public static final CompositeType COMPOSITE_TYPE;
+ static {
+ try
+ {
+ ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+ SimpleType.STRING,
+ ArrayType.getArrayType(SessionInfoCompositeData.COMPOSITE_TYPE)};
+ COMPOSITE_TYPE = new CompositeType(StreamState.class.getName(),
+ "StreamState",
+ ITEM_NAMES,
+ ITEM_DESCS,
+ ITEM_TYPES);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public static CompositeData toCompositeData(final StreamState streamState)
+ {
+ Map<String, Object> valueMap = new HashMap<>();
+ valueMap.put(ITEM_NAMES[0], streamState.planId.toString());
+ valueMap.put(ITEM_NAMES[1], streamState.description);
+
+ CompositeData[] sessions = new CompositeData[streamState.sessions.size()];
+ Lists.newArrayList(Iterables.transform(streamState.sessions, new Function<SessionInfo, CompositeData>()
+ {
+ public CompositeData apply(SessionInfo input)
+ {
+ return SessionInfoCompositeData.toCompositeData(streamState.planId, input);
+ }
+ })).toArray(sessions);
+ valueMap.put(ITEM_NAMES[2], sessions);
+ try
+ {
+ return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public static StreamState fromCompositeData(CompositeData cd)
+ {
+ assert cd.getCompositeType().equals(COMPOSITE_TYPE);
+ Object[] values = cd.getAll(ITEM_NAMES);
+ UUID planId = UUID.fromString((String) values[0]);
+ String description = (String) values[1];
+ Set<SessionInfo> sessions = Sets.newHashSet(Iterables.transform(Arrays.asList((CompositeData[]) values[2]),
+ new Function<CompositeData, SessionInfo>()
+ {
+ public SessionInfo apply(CompositeData input)
+ {
+ return SessionInfoCompositeData.fromCompositeData(input);
+ }
+ }));
+ return new StreamState(planId, description, sessions);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java b/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
new file mode 100644
index 0000000..e93069c
--- /dev/null
+++ b/src/java/org/apache/cassandra/streaming/management/StreamSummaryCompositeData.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming.management;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import javax.management.openmbean.*;
+
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.streaming.StreamSummary;
+
+/**
+ */
+public class StreamSummaryCompositeData
+{
+ private static final String[] ITEM_NAMES = new String[]{"cfId",
+ "files",
+ "totalSize"};
+ private static final String[] ITEM_DESCS = new String[]{"ColumnFamilu ID",
+ "Number of files",
+ "Total bytes of the files"};
+ private static final OpenType<?>[] ITEM_TYPES = new OpenType[]{SimpleType.STRING,
+ SimpleType.INTEGER,
+ SimpleType.LONG};
+
+ public static final CompositeType COMPOSITE_TYPE;
+ static {
+ try
+ {
+ COMPOSITE_TYPE = new CompositeType(StreamSummary.class.getName(),
+ "StreamSummary",
+ ITEM_NAMES,
+ ITEM_DESCS,
+ ITEM_TYPES);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public static CompositeData toCompositeData(StreamSummary streamSummary)
+ {
+ Map<String, Object> valueMap = new HashMap<>();
+ valueMap.put(ITEM_NAMES[0], streamSummary.cfId.toString());
+ valueMap.put(ITEM_NAMES[1], streamSummary.files);
+ valueMap.put(ITEM_NAMES[2], streamSummary.totalSize);
+ try
+ {
+ return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
+ }
+ catch (OpenDataException e)
+ {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ public static StreamSummary fromCompositeData(CompositeData cd)
+ {
+ Object[] values = cd.getAll(ITEM_NAMES);
+ return new StreamSummary(UUID.fromString((String) values[0]),
+ (int) values[1],
+ (long) values[2]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a0d6ed12/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 13624a2..0da2944 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -32,11 +32,14 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import javax.management.*;
+import javax.management.openmbean.CompositeData;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
+import com.google.common.base.Function;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
@@ -52,6 +55,7 @@ import org.apache.cassandra.net.MessagingServiceMBean;
import org.apache.cassandra.service.*;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.streaming.StreamManagerMBean;
+import org.apache.cassandra.streaming.management.StreamStateCompositeData;
import org.apache.cassandra.utils.SimpleCondition;
/**
@@ -547,7 +551,13 @@ public class NodeProbe
public Set<StreamState> getStreamStatus()
{
- return streamProxy.getCurrentStreams();
+ return Sets.newHashSet(Iterables.transform(streamProxy.getCurrentStreams(), new Function<CompositeData, StreamState>()
+ {
+ public StreamState apply(CompositeData input)
+ {
+ return StreamStateCompositeData.fromCompositeData(input);
+ }
+ }));
}
public String getOperationMode()